Compare commits

..

14 Commits

Author SHA1 Message Date
John Spray
50118e0347 tests: update shard split smoke 2024-12-06 17:58:18 +00:00
John Spray
f943fa5753 DNM: cargo: add test-log 2024-12-05 17:35:19 +00:00
John Spray
9172476147 storcon: update migration kick for multiple secondaries 2024-12-05 17:35:16 +00:00
John Spray
e3b8a3ee57 mk3 optimization 2024-12-05 17:33:01 +00:00
John Spray
5bb7472eab storcon: allow reconciles to succeed for down secondaries 2024-12-05 17:33:01 +00:00
John Spray
0d6966b9cb Take 2: Optimization, many_tenants now passes 2024-12-05 17:33:01 +00:00
John Spray
44689fd90b DNM: small scale test 2024-12-05 17:33:01 +00:00
John Spray
43f55136b6 DNM: reduce max optimisations to simplify testing 2024-12-05 17:33:01 +00:00
John Spray
9971913fa0 storcon: add test for scheduling stability 2024-12-05 17:33:01 +00:00
John Spray
01643006c6 optimisation change 2024-12-05 17:32:50 +00:00
John Spray
ea6f6a87a2 tests: update test_shard_preferred_azs
It can now assert that shards get moved to their preferred AZs
2024-12-05 17:31:13 +00:00
John Spray
017fffe583 storcon: change Attached policy to mean _at least_ N secondaries 2024-12-05 17:31:13 +00:00
John Spray
cf896ff144 storcon: prioritize AZ over affinity in scheduler 2024-12-05 17:31:13 +00:00
John Spray
bcd888126e storcon: AZ awareness when filling nodes 2024-12-05 17:31:13 +00:00
97 changed files with 1928 additions and 7522 deletions

View File

@@ -21,5 +21,3 @@ config-variables:
- SLACK_UPCOMING_RELEASE_CHANNEL_ID
- DEV_AWS_OIDC_ROLE_ARN
- BENCHMARK_INGEST_TARGET_PROJECTID
- PGREGRESS_PG16_PROJECT_ID
- PGREGRESS_PG17_PROJECT_ID

View File

@@ -135,7 +135,7 @@ runs:
fi
if [[ "${{ inputs.run_in_parallel }}" == "true" ]]; then
# -n sets the number of parallel processes that pytest-xdist will run
EXTRA_PARAMS="-n8 $EXTRA_PARAMS"
EXTRA_PARAMS="-n12 $EXTRA_PARAMS"
# --dist=loadgroup points tests marked with @pytest.mark.xdist_group
# to the same worker to make @pytest.mark.order work with xdist

View File

@@ -23,14 +23,11 @@ jobs:
regress:
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 16
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
strategy:
fail-fast: false
matrix:
pg-version: [16, 17]
runs-on: us-east-2
container:
@@ -43,11 +40,9 @@ jobs:
submodules: true
- name: Patch the test
env:
PG_VERSION: ${{matrix.pg-version}}
run: |
cd "vendor/postgres-v${PG_VERSION}"
patch -p1 < "../../compute/patches/cloud_regress_pg${PG_VERSION}.patch"
cd "vendor/postgres-v${DEFAULT_PG_VERSION}"
patch -p1 < "../../compute/patches/cloud_regress_pg${DEFAULT_PG_VERSION}.patch"
- name: Generate a random password
id: pwgen
@@ -60,9 +55,8 @@ jobs:
- name: Change tests according to the generated password
env:
DBPASS: ${{ steps.pwgen.outputs.DBPASS }}
PG_VERSION: ${{matrix.pg-version}}
run: |
cd vendor/postgres-v"${PG_VERSION}"/src/test/regress
cd vendor/postgres-v"${DEFAULT_PG_VERSION}"/src/test/regress
for fname in sql/*.sql expected/*.out; do
sed -i.bak s/NEON_PASSWORD_PLACEHOLDER/"'${DBPASS}'"/ "${fname}"
done
@@ -79,29 +73,15 @@ jobs:
path: /tmp/neon/
prefix: latest
- name: Create a new branch
id: create-branch
uses: ./.github/actions/neon-branch-create
with:
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
project_id: ${{ vars[format('PGREGRESS_PG{0}_PROJECT_ID', matrix.pg-version)] }}
- name: Run the regression tests
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: cloud_regress
pg_version: ${{matrix.pg-version}}
pg_version: ${{ env.DEFAULT_PG_VERSION }}
extra_params: -m remote_cluster
env:
BENCHMARK_CONNSTR: ${{steps.create-branch.outputs.dsn}}
- name: Delete branch
uses: ./.github/actions/neon-branch-delete
with:
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
project_id: ${{ vars[format('PGREGRESS_PG{0}_PROJECT_ID', matrix.pg-version)] }}
branch_id: ${{steps.create-branch.outputs.branch_id}}
BENCHMARK_CONNSTR: ${{ secrets.PG_REGRESS_CONNSTR }}
- name: Create Allure report
id: create-allure-report

View File

@@ -1,29 +1,16 @@
# Autoscaling
/libs/vm_monitor/ @neondatabase/autoscaling
# DevProd
/.github/ @neondatabase/developer-productivity
# Compute
/pgxn/ @neondatabase/compute
/vendor/ @neondatabase/compute
/compute/ @neondatabase/compute
/compute_tools/ @neondatabase/compute
# Proxy
/compute_tools/ @neondatabase/control-plane @neondatabase/compute
/libs/pageserver_api/ @neondatabase/storage
/libs/postgres_ffi/ @neondatabase/compute @neondatabase/storage
/libs/proxy/ @neondatabase/proxy
/proxy/ @neondatabase/proxy
# Storage
/libs/remote_storage/ @neondatabase/storage
/libs/safekeeper_api/ @neondatabase/storage
/libs/vm_monitor/ @neondatabase/autoscaling
/pageserver/ @neondatabase/storage
/pgxn/ @neondatabase/compute
/pgxn/neon/ @neondatabase/compute @neondatabase/storage
/proxy/ @neondatabase/proxy
/safekeeper/ @neondatabase/storage
/storage_controller @neondatabase/storage
/storage_scrubber @neondatabase/storage
/libs/pageserver_api/ @neondatabase/storage
/libs/remote_storage/ @neondatabase/storage
/libs/safekeeper_api/ @neondatabase/storage
# Shared
/pgxn/neon/ @neondatabase/compute @neondatabase/storage
/libs/compute_api/ @neondatabase/compute @neondatabase/control-plane
/libs/postgres_ffi/ @neondatabase/compute @neondatabase/storage
/vendor/ @neondatabase/compute

588
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -51,6 +51,10 @@ anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
atomic-take = "1.1.0"
azure_core = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] }
azure_identity = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage_blobs = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
flate2 = "1.0.26"
async-stream = "0.3"
async-trait = "0.1"
@@ -212,12 +216,6 @@ postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git",
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
## Azure SDK crates
azure_core = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "neon", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] }
azure_identity = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "neon", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "neon", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage_blobs = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "neon", default-features = false, features = ["enable_reqwest_rustls"] }
## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }

View File

@@ -115,7 +115,7 @@ RUN set -e \
# Keep the version the same as in compute/compute-node.Dockerfile and
# test_runner/regress/test_compute_metrics.py.
ENV SQL_EXPORTER_VERSION=0.16.0
ENV SQL_EXPORTER_VERSION=0.13.1
RUN curl -fsSL \
"https://github.com/burningalchemist/sql_exporter/releases/download/${SQL_EXPORTER_VERSION}/sql_exporter-${SQL_EXPORTER_VERSION}.linux-$(case "$(uname -m)" in x86_64) echo amd64;; aarch64) echo arm64;; esac).tar.gz" \
--output sql_exporter.tar.gz \

View File

@@ -1324,7 +1324,7 @@ FROM quay.io/prometheuscommunity/postgres-exporter:v0.12.1 AS postgres-exporter
# Keep the version the same as in build-tools.Dockerfile and
# test_runner/regress/test_compute_metrics.py.
FROM burningalchemist/sql_exporter:0.16.0 AS sql-exporter
FROM burningalchemist/sql_exporter:0.13.1 AS sql-exporter
#########################################################################################
#

View File

@@ -6,7 +6,6 @@
import 'sql_exporter/compute_backpressure_throttling_seconds.libsonnet',
import 'sql_exporter/compute_current_lsn.libsonnet',
import 'sql_exporter/compute_logical_snapshot_files.libsonnet',
import 'sql_exporter/compute_logical_snapshots_bytes.libsonnet',
import 'sql_exporter/compute_max_connections.libsonnet',
import 'sql_exporter/compute_receive_lsn.libsonnet',
import 'sql_exporter/compute_subscriptions_count.libsonnet',

View File

@@ -1,7 +0,0 @@
SELECT
(SELECT current_setting('neon.timeline_id')) AS timeline_id,
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
-- These temporary snapshot files are renamed to the actual snapshot files
-- after they are completely built. We only WAL-log the completely built
-- snapshot files
(SELECT COALESCE(sum(size), 0) FROM pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS logical_snapshots_bytes;

View File

@@ -1,17 +0,0 @@
local neon = import 'neon.libsonnet';
local pg_ls_logicalsnapdir = importstr 'sql_exporter/compute_logical_snapshots_bytes.15.sql';
local pg_ls_dir = importstr 'sql_exporter/compute_logical_snapshots_bytes.sql';
{
metric_name: 'compute_logical_snapshots_bytes',
type: 'gauge',
help: 'Size of the pg_logical/snapshots directory, not including temporary files',
key_labels: [
'timeline_id',
],
values: [
'logical_snapshots_bytes',
],
query: if neon.PG_MAJORVERSION_NUM < 15 then pg_ls_dir else pg_ls_logicalsnapdir,
}

View File

@@ -1,9 +0,0 @@
SELECT
(SELECT setting FROM pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
-- These temporary snapshot files are renamed to the actual snapshot files
-- after they are completely built. We only WAL-log the completely built
-- snapshot files
(SELECT COALESCE(sum((pg_stat_file('pg_logical/snapshots/' || name, missing_ok => true)).size), 0)
FROM (SELECT * FROM pg_ls_dir('pg_logical/snapshots') WHERE pg_ls_dir LIKE '%.snap') AS name
) AS logical_snapshots_bytes;

File diff suppressed because it is too large Load Diff

View File

@@ -1243,7 +1243,12 @@ impl ComputeNode {
let postgresql_conf_path = pgdata_path.join("postgresql.conf");
config::write_postgres_conf(&postgresql_conf_path, &spec, self.http_port)?;
let max_concurrent_connections = spec.reconfigure_concurrency;
// TODO(ololobus): We need a concurrency during reconfiguration as well,
// but DB is already running and used by user. We can easily get out of
// `max_connections` limit, and the current code won't handle that.
// let compute_state = self.state.lock().unwrap().clone();
// let max_concurrent_connections = self.max_service_connections(&compute_state, &spec);
let max_concurrent_connections = 1;
// Temporarily reset max_cluster_size in config
// to avoid the possibility of hitting the limit, while we are reconfiguring:

View File

@@ -53,7 +53,6 @@ use compute_api::spec::Role;
use nix::sys::signal::kill;
use nix::sys::signal::Signal;
use pageserver_api::shard::ShardStripeSize;
use reqwest::header::CONTENT_TYPE;
use serde::{Deserialize, Serialize};
use url::Host;
use utils::id::{NodeId, TenantId, TimelineId};
@@ -619,7 +618,6 @@ impl Endpoint {
pgbouncer_settings: None,
shard_stripe_size: Some(shard_stripe_size),
local_proxy_config: None,
reconfigure_concurrency: 1,
};
let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
@@ -819,7 +817,6 @@ impl Endpoint {
self.http_address.ip(),
self.http_address.port()
))
.header(CONTENT_TYPE.as_str(), "application/json")
.body(format!(
"{{\"spec\":{}}}",
serde_json::to_string_pretty(&spec)?

View File

@@ -42,7 +42,6 @@ allow = [
"MPL-2.0",
"OpenSSL",
"Unicode-DFS-2016",
"Unicode-3.0",
]
confidence-threshold = 0.8
exceptions = [

View File

@@ -19,10 +19,6 @@ pub type PgIdent = String;
/// String type alias representing Postgres extension version
pub type ExtVersion = String;
fn default_reconfigure_concurrency() -> usize {
1
}
/// Cluster spec or configuration represented as an optional number of
/// delta operations + final cluster state description.
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
@@ -71,7 +67,7 @@ pub struct ComputeSpec {
pub cluster: Cluster,
pub delta_operations: Option<Vec<DeltaOp>>,
/// An optional hint that can be passed to speed up startup time if we know
/// An optinal hint that can be passed to speed up startup time if we know
/// that no pg catalog mutations (like role creation, database creation,
/// extension creation) need to be done on the actual database to start.
#[serde(default)] // Default false
@@ -90,7 +86,9 @@ pub struct ComputeSpec {
// etc. GUCs in cluster.settings. TODO: Once the control plane has been
// updated to fill these fields, we can make these non optional.
pub tenant_id: Option<TenantId>,
pub timeline_id: Option<TimelineId>,
pub pageserver_connstring: Option<String>,
#[serde(default)]
@@ -115,20 +113,6 @@ pub struct ComputeSpec {
/// Local Proxy configuration used for JWT authentication
#[serde(default)]
pub local_proxy_config: Option<LocalProxySpec>,
/// Number of concurrent connections during the parallel RunInEachDatabase
/// phase of the apply config process.
///
/// We need a higher concurrency during reconfiguration in case of many DBs,
/// but instance is already running and used by client. We can easily get out of
/// `max_connections` limit, and the current code won't handle that.
///
/// Default is 1, but also allow control plane to override this value for specific
/// projects. It's also recommended to bump `superuser_reserved_connections` +=
/// `reconfigure_concurrency` for such projects to ensure that we always have
/// enough spare connections for reconfiguration process to succeed.
#[serde(default = "default_reconfigure_concurrency")]
pub reconfigure_concurrency: usize,
}
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
@@ -331,9 +315,6 @@ mod tests {
// Features list defaults to empty vector.
assert!(spec.features.is_empty());
// Reconfigure concurrency defaults to 1.
assert_eq!(spec.reconfigure_concurrency, 1);
}
#[test]

View File

@@ -245,17 +245,6 @@ impl From<NodeAvailability> for NodeAvailabilityWrapper {
}
}
/// Scheduling policy enables us to selectively disable some automatic actions that the
/// controller performs on a tenant shard. This is only set to a non-default value by
/// human intervention, and it is reset to the default value (Active) when the tenant's
/// placement policy is modified away from Attached.
///
/// The typical use of a non-Active scheduling policy is one of:
/// - Pinnning a shard to a node (i.e. migrating it there & setting a non-Active scheduling policy)
/// - Working around a bug (e.g. if something is flapping and we need to stop it until the bug is fixed)
///
/// If you're not sure which policy to use to pin a shard to its current location, you probably
/// want Pause.
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
pub enum ShardSchedulingPolicy {
// Normal mode: the tenant's scheduled locations may be updated at will, including
@@ -336,6 +325,16 @@ pub enum PlacementPolicy {
Detached,
}
impl PlacementPolicy {
pub fn want_secondaries(&self) -> usize {
match self {
PlacementPolicy::Attached(secondary_count) => *secondary_count,
PlacementPolicy::Secondary => 1,
PlacementPolicy::Detached => 0,
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantShardMigrateResponse {}

View File

@@ -158,8 +158,7 @@ impl ShardIdentity {
key_to_shard_number(self.count, self.stripe_size, key)
}
/// Return true if the key is stored only on this shard. This does not include
/// global keys, see is_key_global().
/// Return true if the key should be ingested by this shard
///
/// Shards must ingest _at least_ keys which return true from this check.
pub fn is_key_local(&self, key: &Key) -> bool {
@@ -172,7 +171,7 @@ impl ShardIdentity {
}
/// Return true if the key should be stored on all shards, not just one.
pub fn is_key_global(&self, key: &Key) -> bool {
fn is_key_global(&self, key: &Key) -> bool {
if key.is_slru_block_key() || key.is_slru_segment_size_key() || key.is_aux_file_key() {
// Special keys that are only stored on shard 0
false

View File

@@ -8,14 +8,15 @@ use std::io;
use std::num::NonZeroU32;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
use anyhow::Context;
use anyhow::Result;
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
use azure_core::{Continuable, RetryOptions};
use azure_identity::DefaultAzureCredential;
use azure_storage::StorageCredentials;
use azure_storage_blobs::blob::CopyStatus;
use azure_storage_blobs::prelude::ClientBuilder;
@@ -75,9 +76,8 @@ impl AzureBlobStorage {
let credentials = if let Ok(access_key) = env::var("AZURE_STORAGE_ACCESS_KEY") {
StorageCredentials::access_key(account.clone(), access_key)
} else {
let token_credential = azure_identity::create_default_credential()
.context("trying to obtain Azure default credentials")?;
StorageCredentials::token_credential(token_credential)
let token_credential = DefaultAzureCredential::default();
StorageCredentials::token_credential(Arc::new(token_credential))
};
// we have an outer retry
@@ -624,10 +624,6 @@ impl RemoteStorage for AzureBlobStorage {
res
}
fn max_keys_per_delete(&self) -> usize {
super::MAX_KEYS_PER_DELETE_AZURE
}
async fn copy(
&self,
from: &RemotePath,

View File

@@ -70,14 +70,7 @@ pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
/// As defined in S3 docs
///
/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
pub const MAX_KEYS_PER_DELETE_S3: usize = 1000;
/// As defined in Azure docs
///
/// <https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch>
pub const MAX_KEYS_PER_DELETE_AZURE: usize = 256;
pub const MAX_KEYS_PER_DELETE: usize = 1000;
const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
@@ -347,14 +340,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
cancel: &CancellationToken,
) -> anyhow::Result<()>;
/// Returns the maximum number of keys that a call to [`Self::delete_objects`] can delete without chunking
///
/// The value returned is only an optimization hint, One can pass larger number of objects to
/// `delete_objects` as well.
///
/// The value is guaranteed to be >= 1.
fn max_keys_per_delete(&self) -> usize;
/// Deletes all objects matching the given prefix.
///
/// NB: this uses NoDelimiter and will match partial prefixes. For example, the prefix /a/b will
@@ -548,16 +533,6 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
}
}
/// [`RemoteStorage::max_keys_per_delete`]
pub fn max_keys_per_delete(&self) -> usize {
match self {
Self::LocalFs(s) => s.max_keys_per_delete(),
Self::AwsS3(s) => s.max_keys_per_delete(),
Self::AzureBlob(s) => s.max_keys_per_delete(),
Self::Unreliable(s) => s.max_keys_per_delete(),
}
}
/// See [`RemoteStorage::delete_prefix`]
pub async fn delete_prefix(
&self,

View File

@@ -573,10 +573,6 @@ impl RemoteStorage for LocalFs {
Ok(())
}
fn max_keys_per_delete(&self) -> usize {
super::MAX_KEYS_PER_DELETE_S3
}
async fn copy(
&self,
from: &RemotePath,

View File

@@ -48,7 +48,7 @@ use crate::{
metrics::{start_counting_cancelled_wait, start_measuring_requests},
support::PermitCarrying,
ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
RemotePath, RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE_S3,
RemotePath, RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE,
REMOTE_STORAGE_PREFIX_SEPARATOR,
};
@@ -355,7 +355,7 @@ impl S3Bucket {
let kind = RequestKind::Delete;
let mut cancel = std::pin::pin!(cancel.cancelled());
for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE_S3) {
for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
let started_at = start_measuring_requests(kind);
let req = self
@@ -832,10 +832,6 @@ impl RemoteStorage for S3Bucket {
self.delete_oids(&permit, &delete_objects, cancel).await
}
fn max_keys_per_delete(&self) -> usize {
MAX_KEYS_PER_DELETE_S3
}
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
let paths = std::array::from_ref(path);
self.delete_objects(paths, cancel).await

View File

@@ -203,10 +203,6 @@ impl RemoteStorage for UnreliableWrapper {
Ok(())
}
fn max_keys_per_delete(&self) -> usize {
self.inner.max_keys_per_delete()
}
async fn copy(
&self,
from: &RemotePath,

View File

@@ -164,12 +164,6 @@ impl TenantShardId {
}
}
impl std::fmt::Display for ShardNumber {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::fmt::Display for ShardSlug<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(

View File

@@ -9,6 +9,7 @@
use remote_storage::GenericRemoteStorage;
use remote_storage::RemotePath;
use remote_storage::TimeoutOrCancel;
use remote_storage::MAX_KEYS_PER_DELETE;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::info;
@@ -130,8 +131,7 @@ impl Deleter {
}
pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
let max_keys_per_delete = self.remote_storage.max_keys_per_delete();
self.accumulator.reserve(max_keys_per_delete);
self.accumulator.reserve(MAX_KEYS_PER_DELETE);
loop {
if self.cancel.is_cancelled() {
@@ -156,14 +156,14 @@ impl Deleter {
match msg {
DeleterMessage::Delete(mut list) => {
while !list.is_empty() || self.accumulator.len() == max_keys_per_delete {
if self.accumulator.len() == max_keys_per_delete {
while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
if self.accumulator.len() == MAX_KEYS_PER_DELETE {
self.flush().await?;
// If we have received this number of keys, proceed with attempting to execute
assert_eq!(self.accumulator.len(), 0);
}
let available_slots = max_keys_per_delete - self.accumulator.len();
let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
let take_count = std::cmp::min(available_slots, list.len());
for path in list.drain(list.len() - take_count..) {
self.accumulator.push(path);

View File

@@ -87,7 +87,7 @@ use crate::tenant::timeline::offload::offload_timeline;
use crate::tenant::timeline::offload::OffloadError;
use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::CompactOptions;
use crate::tenant::timeline::CompactRequest;
use crate::tenant::timeline::CompactRange;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::Timeline;
use crate::tenant::GetTimelineError;
@@ -1978,26 +1978,6 @@ async fn timeline_gc_handler(
json_response(StatusCode::OK, gc_result)
}
// Cancel scheduled compaction tasks
async fn timeline_cancel_compact_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
async {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.cancel_scheduled_compaction(timeline_id);
json_response(StatusCode::OK, ())
}
.instrument(info_span!("timeline_cancel_compact", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.await
}
// Run compaction immediately on given timeline.
async fn timeline_compact_handler(
mut request: Request<Body>,
@@ -2007,7 +1987,7 @@ async fn timeline_compact_handler(
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let compact_request = json_request_maybe::<Option<CompactRequest>>(&mut request).await?;
let compact_range = json_request_maybe::<Option<CompactRange>>(&mut request).await?;
let state = get_state(&request);
@@ -2032,50 +2012,22 @@ async fn timeline_compact_handler(
let wait_until_uploaded =
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
let wait_until_scheduled_compaction_done =
parse_query_param::<_, bool>(&request, "wait_until_scheduled_compaction_done")?
.unwrap_or(false);
let sub_compaction = compact_request
.as_ref()
.map(|r| r.sub_compaction)
.unwrap_or(false);
let options = CompactOptions {
compact_range: compact_request
.as_ref()
.and_then(|r| r.compact_range.clone()),
compact_below_lsn: compact_request.as_ref().and_then(|r| r.compact_below_lsn),
compact_range,
flags,
sub_compaction,
};
let scheduled = compact_request
.as_ref()
.map(|r| r.scheduled)
.unwrap_or(false);
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
if scheduled {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let rx = tenant.schedule_compaction(timeline_id, options).await.map_err(ApiError::InternalServerError)?;
if wait_until_scheduled_compaction_done {
// It is possible that this will take a long time, dropping the HTTP request will not cancel the compaction.
rx.await.ok();
}
} else {
timeline
.compact_with_options(&cancel, options, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
if wait_until_uploaded {
timeline.remote_client.wait_completion().await
// XXX map to correct ApiError for the cases where it's due to shutdown
.context("wait completion").map_err(ApiError::InternalServerError)?;
}
timeline
.compact_with_options(&cancel, options, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
if wait_until_uploaded {
timeline.remote_client.wait_completion().await
// XXX map to correct ApiError for the cases where it's due to shutdown
.context("wait completion").map_err(ApiError::InternalServerError)?;
}
json_response(StatusCode::OK, ())
}
@@ -2156,20 +2108,16 @@ async fn timeline_checkpoint_handler(
// By default, checkpoints come with a compaction, but this may be optionally disabled by tests that just want to flush + upload.
let compact = parse_query_param::<_, bool>(&request, "compact")?.unwrap_or(true);
let wait_until_flushed: bool =
parse_query_param(&request, "wait_until_flushed")?.unwrap_or(true);
let wait_until_uploaded =
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
if wait_until_flushed {
timeline.freeze_and_flush().await
} else {
timeline.freeze().await.and(Ok(()))
}.map_err(|e| {
timeline
.freeze_and_flush()
.await
.map_err(|e| {
match e {
tenant::timeline::FlushLayerError::Cancelled => ApiError::ShuttingDown,
other => ApiError::InternalServerError(other.into()),
@@ -3353,10 +3301,6 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|r| api_handler(r, timeline_compact_handler),
)
.delete(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|r| api_handler(r, timeline_cancel_compact_handler),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/offload",
|r| testing_api_handler("attempt timeline offload", r, timeline_offload_handler),

View File

@@ -464,24 +464,6 @@ static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
static DISK_CONSISTENT_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_disk_consistent_lsn",
"Disk consistent LSN grouped by timeline",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
pub(crate) static PROJECTED_REMOTE_CONSISTENT_LSN: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_projected_remote_consistent_lsn",
"Projected remote consistent LSN grouped by timeline",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
static PITR_HISTORY_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_pitr_history_size",
@@ -1223,60 +1205,31 @@ pub(crate) mod virtual_file_io_engine {
});
}
pub(crate) struct SmgrOpTimer(Option<SmgrOpTimerInner>);
pub(crate) struct SmgrOpTimerInner {
pub(crate) struct SmgrOpTimer {
global_latency_histo: Histogram,
// Optional because not all op types are tracked per-timeline
per_timeline_latency_histo: Option<Histogram>,
global_flush_in_progress_micros: IntCounter,
per_timeline_flush_in_progress_micros: IntCounter,
start: Instant,
throttled: Duration,
op: SmgrQueryType,
}
pub(crate) struct SmgrOpFlushInProgress {
base: Instant,
global_micros: IntCounter,
per_timeline_micros: IntCounter,
}
impl SmgrOpTimer {
pub(crate) fn deduct_throttle(&mut self, throttle: &Option<Duration>) {
let Some(throttle) = throttle else {
return;
};
let inner = self.0.as_mut().expect("other public methods consume self");
inner.throttled += *throttle;
self.throttled += *throttle;
}
}
pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> SmgrOpFlushInProgress {
let (flush_start, inner) = self
.smgr_op_end()
.expect("this method consume self, and the only other caller is drop handler");
let SmgrOpTimerInner {
global_flush_in_progress_micros,
per_timeline_flush_in_progress_micros,
..
} = inner;
SmgrOpFlushInProgress {
base: flush_start,
global_micros: global_flush_in_progress_micros,
per_timeline_micros: per_timeline_flush_in_progress_micros,
}
}
impl Drop for SmgrOpTimer {
fn drop(&mut self) {
let elapsed = self.start.elapsed();
/// Returns `None`` if this method has already been called, `Some` otherwise.
fn smgr_op_end(&mut self) -> Option<(Instant, SmgrOpTimerInner)> {
let inner = self.0.take()?;
let now = Instant::now();
let elapsed = now - inner.start;
let elapsed = match elapsed.checked_sub(inner.throttled) {
let elapsed = match elapsed.checked_sub(self.throttled) {
Some(elapsed) => elapsed,
None => {
use utils::rate_limit::RateLimit;
@@ -1287,9 +1240,9 @@ impl SmgrOpTimer {
})))
});
let mut guard = LOGGED.lock().unwrap();
let rate_limit = &mut guard[inner.op];
let rate_limit = &mut guard[self.op];
rate_limit.call(|| {
warn!(op=?inner.op, ?elapsed, ?inner.throttled, "implementation error: time spent throttled exceeds total request wall clock time");
warn!(op=?self.op, ?elapsed, ?self.throttled, "implementation error: time spent throttled exceeds total request wall clock time");
});
elapsed // un-throttled time, more info than just saturating to 0
}
@@ -1297,54 +1250,10 @@ impl SmgrOpTimer {
let elapsed = elapsed.as_secs_f64();
inner.global_latency_histo.observe(elapsed);
if let Some(per_timeline_getpage_histo) = &inner.per_timeline_latency_histo {
self.global_latency_histo.observe(elapsed);
if let Some(per_timeline_getpage_histo) = &self.per_timeline_latency_histo {
per_timeline_getpage_histo.observe(elapsed);
}
Some((now, inner))
}
}
impl Drop for SmgrOpTimer {
fn drop(&mut self) {
self.smgr_op_end();
}
}
impl SmgrOpFlushInProgress {
pub(crate) async fn measure<Fut, O>(mut self, mut fut: Fut) -> O
where
Fut: std::future::Future<Output = O>,
{
let mut fut = std::pin::pin!(fut);
let now = Instant::now();
// Whenever observe_guard gets called, or dropped,
// it adds the time elapsed since its last call to metrics.
// Last call is tracked in `now`.
let mut observe_guard = scopeguard::guard(
|| {
let elapsed = now - self.base;
self.global_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.per_timeline_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.base = now;
},
|mut observe| {
observe();
},
);
loop {
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
Ok(v) => return v,
Err(_timeout) => {
(*observe_guard)();
}
}
}
}
}
@@ -1375,8 +1284,6 @@ pub(crate) struct SmgrQueryTimePerTimeline {
per_timeline_getpage_latency: Histogram,
global_batch_size: Histogram,
per_timeline_batch_size: Histogram,
global_flush_in_progress_micros: IntCounter,
per_timeline_flush_in_progress_micros: IntCounter,
}
static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
@@ -1539,26 +1446,6 @@ fn set_page_service_config_max_batch_size(conf: &PageServicePipeliningConfig) {
.set(value.try_into().unwrap());
}
static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_service_pagestream_flush_in_progress_micros",
"Counter that sums up the microseconds that a pagestream response was being flushed into the TCP connection. \
If the flush is particularly slow, this counter will be updated periodically to make slow flushes \
easily discoverable in monitoring. \
Hence, this is NOT a completion latency historgram.",
&["tenant_id", "shard_id", "timeline_id"],
)
.expect("failed to define a metric")
});
static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_page_service_pagestream_flush_in_progress_micros_global",
"Like pageserver_page_service_pagestream_flush_in_progress_seconds, but instance-wide.",
)
.expect("failed to define a metric")
});
impl SmgrQueryTimePerTimeline {
pub(crate) fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self {
let tenant_id = tenant_shard_id.tenant_id.to_string();
@@ -1599,12 +1486,6 @@ impl SmgrQueryTimePerTimeline {
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
.unwrap();
let global_flush_in_progress_micros =
PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL.clone();
let per_timeline_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
.unwrap();
Self {
global_started,
global_latency,
@@ -1612,8 +1493,6 @@ impl SmgrQueryTimePerTimeline {
per_timeline_getpage_started,
global_batch_size,
per_timeline_batch_size,
global_flush_in_progress_micros,
per_timeline_flush_in_progress_micros,
}
}
pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, started_at: Instant) -> SmgrOpTimer {
@@ -1626,17 +1505,13 @@ impl SmgrQueryTimePerTimeline {
None
};
SmgrOpTimer(Some(SmgrOpTimerInner {
SmgrOpTimer {
global_latency_histo: self.global_latency[op as usize].clone(),
per_timeline_latency_histo,
start: started_at,
op,
throttled: Duration::ZERO,
global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(),
per_timeline_flush_in_progress_micros: self
.per_timeline_flush_in_progress_micros
.clone(),
}))
}
}
pub(crate) fn observe_getpage_batch_start(&self, batch_size: usize) {
@@ -2311,15 +2186,6 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMet
.expect("failed to define a metric"),
});
pub(crate) static PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_timeline_wal_records_received",
"Number of WAL records received per shard",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
pub(crate) static WAL_REDO_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_wal_redo_seconds",
@@ -2528,8 +2394,7 @@ pub(crate) struct TimelineMetrics {
pub load_layer_map_histo: StorageTimeMetrics,
pub garbage_collect_histo: StorageTimeMetrics,
pub find_gc_cutoffs_histo: StorageTimeMetrics,
pub last_record_lsn_gauge: IntGauge,
pub disk_consistent_lsn_gauge: IntGauge,
pub last_record_gauge: IntGauge,
pub pitr_history_size: UIntGauge,
pub archival_size: UIntGauge,
pub(crate) layer_size_image: UIntGauge,
@@ -2547,7 +2412,6 @@ pub(crate) struct TimelineMetrics {
pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>,
/// Number of valid LSN leases.
pub valid_lsn_lease_count_gauge: UIntGauge,
pub wal_records_received: IntCounter,
shutdown: std::sync::atomic::AtomicBool,
}
@@ -2611,11 +2475,7 @@ impl TimelineMetrics {
&shard_id,
&timeline_id,
);
let last_record_lsn_gauge = LAST_RECORD_LSN
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let disk_consistent_lsn_gauge = DISK_CONSISTENT_LSN
let last_record_gauge = LAST_RECORD_LSN
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
@@ -2705,10 +2565,6 @@ impl TimelineMetrics {
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let wal_records_received = PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
TimelineMetrics {
tenant_id,
shard_id,
@@ -2722,8 +2578,7 @@ impl TimelineMetrics {
garbage_collect_histo,
find_gc_cutoffs_histo,
load_layer_map_histo,
last_record_lsn_gauge,
disk_consistent_lsn_gauge,
last_record_gauge,
pitr_history_size,
archival_size,
layer_size_image,
@@ -2741,7 +2596,6 @@ impl TimelineMetrics {
evictions_with_low_residence_duration,
),
valid_lsn_lease_count_gauge,
wal_records_received,
shutdown: std::sync::atomic::AtomicBool::default(),
}
}
@@ -2788,7 +2642,6 @@ impl TimelineMetrics {
let timeline_id = &self.timeline_id;
let shard_id = &self.shard_id;
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = DISK_CONSISTENT_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = FLUSH_WAIT_UPLOAD_TIME.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = STANDBY_HORIZON.remove_label_values(&[tenant_id, shard_id, timeline_id]);
{
@@ -2879,16 +2732,6 @@ impl TimelineMetrics {
shard_id,
timeline_id,
]);
let _ = PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
]);
let _ = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
]);
}
}
@@ -2962,7 +2805,6 @@ pub(crate) struct RemoteTimelineClientMetrics {
calls: Mutex<HashMap<(&'static str, &'static str), IntCounterPair>>,
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
pub(crate) projected_remote_consistent_lsn_gauge: UIntGauge,
}
impl RemoteTimelineClientMetrics {
@@ -2977,10 +2819,6 @@ impl RemoteTimelineClientMetrics {
.unwrap(),
);
let projected_remote_consistent_lsn_gauge = PROJECTED_REMOTE_CONSISTENT_LSN
.get_metric_with_label_values(&[&tenant_id_str, &shard_id_str, &timeline_id_str])
.unwrap();
RemoteTimelineClientMetrics {
tenant_id: tenant_id_str,
shard_id: shard_id_str,
@@ -2989,7 +2827,6 @@ impl RemoteTimelineClientMetrics {
bytes_started_counter: Mutex::new(HashMap::default()),
bytes_finished_counter: Mutex::new(HashMap::default()),
remote_physical_size_gauge,
projected_remote_consistent_lsn_gauge,
}
}
@@ -3203,7 +3040,6 @@ impl Drop for RemoteTimelineClientMetrics {
calls,
bytes_started_counter,
bytes_finished_counter,
projected_remote_consistent_lsn_gauge,
} = self;
for ((a, b), _) in calls.get_mut().unwrap().drain() {
let mut res = [Ok(()), Ok(())];
@@ -3233,14 +3069,6 @@ impl Drop for RemoteTimelineClientMetrics {
let _ = remote_physical_size_gauge; // use to avoid 'unused' warning in desctructuring above
let _ = REMOTE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
}
{
let _ = projected_remote_consistent_lsn_gauge;
let _ = PROJECTED_REMOTE_CONSISTENT_LSN.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
]);
}
}
}

View File

@@ -1017,8 +1017,10 @@ impl PageServerHandler {
// Map handler result to protocol behavior.
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
let mut timers: smallvec::SmallVec<[_; 1]> =
smallvec::SmallVec::with_capacity(handler_results.len());
for handler_result in handler_results {
let (response_msg, timer) = match handler_result {
let response_msg = match handler_result {
Err(e) => match &e {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
@@ -1042,66 +1044,34 @@ impl PageServerHandler {
span.in_scope(|| {
error!("error reading relation or page version: {full:#}")
});
(
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
}),
None, // TODO: measure errors
)
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
}
},
Ok((response_msg, timer)) => (response_msg, Some(timer)),
Ok((response_msg, timer)) => {
// Extending the lifetime of the timers so observations on drop
// include the flush time.
timers.push(timer);
response_msg
}
};
//
// marshal & transmit response message
//
pgb_writer.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
// We purposefully don't count flush time into the timer.
//
// The reason is that current compute client will not perform protocol processing
// if the postgres backend process is doing things other than `->smgr_read()`.
// This is especially the case for prefetch.
//
// If the compute doesn't read from the connection, eventually TCP will backpressure
// all the way into our flush call below.
//
// The timer's underlying metric is used for a storage-internal latency SLO and
// we don't want to include latency in it that we can't control.
// And as pointed out above, in this case, we don't control the time that flush will take.
let flushing_timer =
timer.map(|timer| timer.observe_smgr_op_completion_and_start_flushing());
// what we want to do
let flush_fut = pgb_writer.flush();
// metric for how long flushing takes
let flush_fut = match flushing_timer {
Some(flushing_timer) => {
futures::future::Either::Left(flushing_timer.measure(flush_fut))
}
None => futures::future::Either::Right(flush_fut),
};
// do it while respecting cancellation
let _: () = async move {
tokio::select! {
biased;
_ = cancel.cancelled() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
return Err(QueryError::Shutdown)
}
res = flush_fut => {
res?;
}
}
Ok(())
}
// and log the info! line inside the request span
.instrument(span.clone())
.await?;
}
tokio::select! {
biased;
_ = cancel.cancelled() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
return Err(QueryError::Shutdown)
}
res = pgb_writer.flush() => {
res?;
}
}
drop(timers);
Ok(())
}

View File

@@ -37,19 +37,14 @@ use remote_timeline_client::manifest::{
};
use remote_timeline_client::UploadQueueNotReadyError;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::sync::atomic::AtomicBool;
use std::sync::Weak;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
use timeline::compaction::ScheduledCompactionTask;
use timeline::import_pgdata;
use timeline::offload::offload_timeline;
use timeline::CompactFlags;
use timeline::CompactOptions;
use timeline::CompactionError;
use timeline::ShutdownMode;
use tokio::io::BufReader;
use tokio::sync::watch;
@@ -344,11 +339,6 @@ pub struct Tenant {
/// Overhead of mutex is acceptable because compaction is done with a multi-second period.
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,
/// Scheduled compaction tasks. Currently, this can only be populated by triggering
/// a manual gc-compaction from the manual compaction API.
scheduled_compaction_tasks:
std::sync::Mutex<HashMap<TimelineId, VecDeque<ScheduledCompactionTask>>>,
/// If the tenant is in Activating state, notify this to encourage it
/// to proceed to Active as soon as possible, rather than waiting for lazy
/// background warmup.
@@ -2963,109 +2953,27 @@ impl Tenant {
for (timeline_id, timeline, (can_compact, can_offload)) in &timelines_to_compact_or_offload
{
// pending_task_left == None: cannot compact, maybe still pending tasks
// pending_task_left == Some(true): compaction task left
// pending_task_left == Some(false): no compaction task left
let pending_task_left = if *can_compact {
let has_pending_l0_compaction_task = timeline
.compact(cancel, EnumSet::empty(), ctx)
.instrument(info_span!("compact_timeline", %timeline_id))
.await
.inspect_err(|e| match e {
timeline::CompactionError::ShuttingDown => (),
timeline::CompactionError::Offload(_) => {
// Failures to offload timelines do not trip the circuit breaker, because
// they do not do lots of writes the way compaction itself does: it is cheap
// to retry, and it would be bad to stop all compaction because of an issue with offloading.
}
timeline::CompactionError::Other(e) => {
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, e);
}
})?;
if has_pending_l0_compaction_task {
Some(true)
} else {
let mut has_pending_scheduled_compaction_task;
let next_scheduled_compaction_task = {
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
if let Some(tline_pending_tasks) = guard.get_mut(timeline_id) {
if !tline_pending_tasks.is_empty() {
info!(
"{} tasks left in the compaction schedule queue",
tline_pending_tasks.len()
);
Some(
timeline
.compact(cancel, EnumSet::empty(), ctx)
.instrument(info_span!("compact_timeline", %timeline_id))
.await
.inspect_err(|e| match e {
timeline::CompactionError::ShuttingDown => (),
timeline::CompactionError::Offload(_) => {
// Failures to offload timelines do not trip the circuit breaker, because
// they do not do lots of writes the way compaction itself does: it is cheap
// to retry, and it would be bad to stop all compaction because of an issue with offloading.
}
let next_task = tline_pending_tasks.pop_front();
has_pending_scheduled_compaction_task = !tline_pending_tasks.is_empty();
next_task
} else {
has_pending_scheduled_compaction_task = false;
None
}
};
if let Some(mut next_scheduled_compaction_task) = next_scheduled_compaction_task
{
if !next_scheduled_compaction_task
.options
.flags
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
{
warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options);
} else if next_scheduled_compaction_task.options.sub_compaction {
info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
let jobs = timeline
.gc_compaction_split_jobs(next_scheduled_compaction_task.options)
.await
.map_err(CompactionError::Other)?;
if jobs.is_empty() {
info!("no jobs to run, skipping scheduled compaction task");
} else {
has_pending_scheduled_compaction_task = true;
let jobs_len = jobs.len();
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
let tline_pending_tasks = guard.entry(*timeline_id).or_default();
for (idx, job) in jobs.into_iter().enumerate() {
tline_pending_tasks.push_back(if idx == jobs_len - 1 {
ScheduledCompactionTask {
options: job,
// The last job in the queue sends the signal and releases the gc guard
result_tx: next_scheduled_compaction_task
.result_tx
.take(),
gc_block: next_scheduled_compaction_task
.gc_block
.take(),
}
} else {
ScheduledCompactionTask {
options: job,
result_tx: None,
gc_block: None,
}
});
}
info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
timeline::CompactionError::Other(e) => {
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, e);
}
} else {
let _ = timeline
.compact_with_options(
cancel,
next_scheduled_compaction_task.options,
ctx,
)
.instrument(info_span!("scheduled_compact_timeline", %timeline_id))
.await?;
if let Some(tx) = next_scheduled_compaction_task.result_tx.take() {
// TODO: we can send compaction statistics in the future
tx.send(()).ok();
}
}
}
Some(has_pending_scheduled_compaction_task)
}
})?,
)
} else {
None
};
@@ -3085,43 +2993,6 @@ impl Tenant {
Ok(has_pending_task)
}
/// Cancel scheduled compaction tasks
pub(crate) fn cancel_scheduled_compaction(
&self,
timeline_id: TimelineId,
) -> Vec<ScheduledCompactionTask> {
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
if let Some(tline_pending_tasks) = guard.get_mut(&timeline_id) {
let current_tline_pending_tasks = std::mem::take(tline_pending_tasks);
current_tline_pending_tasks.into_iter().collect()
} else {
Vec::new()
}
}
/// Schedule a compaction task for a timeline.
pub(crate) async fn schedule_compaction(
&self,
timeline_id: TimelineId,
options: CompactOptions,
) -> anyhow::Result<tokio::sync::oneshot::Receiver<()>> {
let gc_guard = match self.gc_block.start().await {
Ok(guard) => guard,
Err(e) => {
bail!("cannot run gc-compaction because gc is blocked: {}", e);
}
};
let (tx, rx) = tokio::sync::oneshot::channel();
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
let tline_pending_tasks = guard.entry(timeline_id).or_default();
tline_pending_tasks.push_back(ScheduledCompactionTask {
options,
result_tx: Some(tx),
gc_block: Some(gc_guard),
});
Ok(rx)
}
// Call through to all timelines to freeze ephemeral layers if needed. Usually
// this happens during ingest: this background housekeeping is for freezing layers
// that are open but haven't been written to for some time.
@@ -4134,7 +4005,6 @@ impl Tenant {
// use an extremely long backoff.
Some(Duration::from_secs(3600 * 24)),
)),
scheduled_compaction_tasks: Mutex::new(Default::default()),
activate_now_sem: tokio::sync::Semaphore::new(0),
attach_wal_lag_cooldown: Arc::new(std::sync::OnceLock::new()),
cancel: CancellationToken::default(),
@@ -4506,12 +4376,7 @@ impl Tenant {
// - this timeline was created while we were finding cutoffs
// - lsn for timestamp search fails for this timeline repeatedly
if let Some(cutoffs) = gc_cutoffs.get(&timeline.timeline_id) {
let original_cutoffs = target.cutoffs.clone();
// GC cutoffs should never go back
target.cutoffs = GcCutoffs {
space: Lsn(cutoffs.space.0.max(original_cutoffs.space.0)),
time: Lsn(cutoffs.time.0.max(original_cutoffs.time.0)),
}
target.cutoffs = cutoffs.clone();
}
}
@@ -8171,12 +8036,6 @@ mod tests {
)
.await?;
{
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x30);
@@ -8279,12 +8138,6 @@ mod tests {
// increase GC horizon and compact again
{
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x40))
.wait()
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x40);
@@ -8665,12 +8518,6 @@ mod tests {
.await?
};
{
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
*guard = GcInfo {
@@ -8752,12 +8599,6 @@ mod tests {
// increase GC horizon and compact again
{
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x40))
.wait()
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x40);
@@ -9205,12 +9046,6 @@ mod tests {
)
.await?;
{
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
*guard = GcInfo {
@@ -9328,7 +9163,6 @@ mod tests {
CompactOptions {
flags: dryrun_flags,
compact_range: None,
..Default::default()
},
&ctx,
)
@@ -9353,12 +9187,6 @@ mod tests {
// increase GC horizon and compact again
{
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x38))
.wait()
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x38);
@@ -9454,12 +9282,6 @@ mod tests {
)
.await?;
{
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
*guard = GcInfo {
@@ -9577,7 +9399,6 @@ mod tests {
CompactOptions {
flags: dryrun_flags,
compact_range: None,
..Default::default()
},
&ctx,
)
@@ -9704,12 +9525,6 @@ mod tests {
branch_tline.add_extra_test_dense_keyspace(KeySpace::single(get_key(0)..get_key(10)));
{
parent_tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x10))
.wait()
.await;
// Update GC info
let mut guard = parent_tline.gc_info.write().unwrap();
*guard = GcInfo {
@@ -9724,12 +9539,6 @@ mod tests {
}
{
branch_tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x50))
.wait()
.await;
// Update GC info
let mut guard = branch_tline.gc_info.write().unwrap();
*guard = GcInfo {
@@ -10059,12 +9868,6 @@ mod tests {
.await?;
{
tline
.latest_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x30))
.wait()
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
*guard = GcInfo {
@@ -10082,15 +9885,7 @@ mod tests {
// Do a partial compaction on key range 0..2
tline
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(0)..get_key(2)).into()),
..Default::default()
},
&ctx,
)
.partial_compact_with_gc(get_key(0)..get_key(2), &cancel, EnumSet::new(), &ctx)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
@@ -10129,15 +9924,7 @@ mod tests {
// Do a partial compaction on key range 2..4
tline
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(2)..get_key(4)).into()),
..Default::default()
},
&ctx,
)
.partial_compact_with_gc(get_key(2)..get_key(4), &cancel, EnumSet::new(), &ctx)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
@@ -10181,15 +9968,7 @@ mod tests {
// Do a partial compaction on key range 4..9
tline
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(4)..get_key(9)).into()),
..Default::default()
},
&ctx,
)
.partial_compact_with_gc(get_key(4)..get_key(9), &cancel, EnumSet::new(), &ctx)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
@@ -10232,15 +10011,7 @@ mod tests {
// Do a partial compaction on key range 9..10
tline
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(9)..get_key(10)).into()),
..Default::default()
},
&ctx,
)
.partial_compact_with_gc(get_key(9)..get_key(10), &cancel, EnumSet::new(), &ctx)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
@@ -10288,15 +10059,7 @@ mod tests {
// Do a partial compaction on key range 0..10, all image layers below LSN 20 can be replaced with new ones.
tline
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(0)..get_key(10)).into()),
..Default::default()
},
&ctx,
)
.partial_compact_with_gc(get_key(0)..get_key(10), &cancel, EnumSet::new(), &ctx)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;

View File

@@ -1,4 +1,4 @@
use std::{collections::HashMap, sync::Arc};
use std::collections::HashMap;
use utils::id::TimelineId;
@@ -20,7 +20,7 @@ pub(crate) struct GcBlock {
/// Do not add any more features taking and forbidding taking this lock. It should be
/// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`]
/// synchronizes with gc attempts by locking and unlocking this mutex.
blocking: Arc<tokio::sync::Mutex<()>>,
blocking: tokio::sync::Mutex<()>,
}
impl GcBlock {
@@ -30,7 +30,7 @@ impl GcBlock {
/// it's ending, or if not currently possible, a value describing the reasons why not.
///
/// Cancellation safe.
pub(super) async fn start(&self) -> Result<Guard, BlockingReasons> {
pub(super) async fn start(&self) -> Result<Guard<'_>, BlockingReasons> {
let reasons = {
let g = self.reasons.lock().unwrap();
@@ -44,7 +44,7 @@ impl GcBlock {
Err(reasons)
} else {
Ok(Guard {
_inner: self.blocking.clone().lock_owned().await,
_inner: self.blocking.lock().await,
})
}
}
@@ -170,8 +170,8 @@ impl GcBlock {
}
}
pub(crate) struct Guard {
_inner: tokio::sync::OwnedMutexGuard<()>,
pub(super) struct Guard<'a> {
_inner: tokio::sync::MutexGuard<'a, ()>,
}
#[derive(Debug)]

View File

@@ -2192,9 +2192,6 @@ impl RemoteTimelineClient {
upload_queue.clean.1 = Some(task.task_id);
let lsn = upload_queue.clean.0.metadata.disk_consistent_lsn();
self.metrics
.projected_remote_consistent_lsn_gauge
.set(lsn.0);
if self.generation.is_none() {
// Legacy mode: skip validating generation

View File

@@ -6,6 +6,7 @@
use std::collections::HashSet;
use std::future::Future;
use std::str::FromStr;
use std::sync::Arc;
use std::time::SystemTime;
use anyhow::{anyhow, Context};
@@ -26,7 +27,7 @@ use crate::span::{
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::LayerName;
use crate::tenant::Generation;
use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
use crate::virtual_file::{on_fatal_io_error, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::TEMP_FILE_SUFFIX;
use remote_storage::{
DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
@@ -149,7 +150,7 @@ async fn download_object<'a>(
storage: &'a GenericRemoteStorage,
src_path: &RemotePath,
dst_path: &Utf8PathBuf,
#[cfg_attr(target_os = "macos", allow(unused_variables))] gate: &utils::sync::gate::Gate,
gate: &utils::sync::gate::Gate,
cancel: &CancellationToken,
#[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
) -> Result<u64, DownloadError> {
@@ -208,8 +209,6 @@ async fn download_object<'a>(
#[cfg(target_os = "linux")]
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
use crate::virtual_file::owned_buffers_io;
use crate::virtual_file::IoBufferMut;
use std::sync::Arc;
async {
let destination_file = Arc::new(
VirtualFile::create(dst_path, ctx)

View File

@@ -53,7 +53,7 @@ use utils::{
postgres_client::PostgresClientProtocol,
sync::gate::{Gate, GateGuard},
};
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
use wal_decoder::serialized_batch::SerializedValueBatch;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::{Arc, Mutex, RwLock, Weak};
@@ -768,7 +768,7 @@ pub enum GetLogicalSizePriority {
Background,
}
#[derive(Debug, enumset::EnumSetType)]
#[derive(enumset::EnumSetType)]
pub(crate) enum CompactFlags {
ForceRepartition,
ForceImageLayerCreation,
@@ -777,19 +777,6 @@ pub(crate) enum CompactFlags {
DryRun,
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct CompactRequest {
pub compact_range: Option<CompactRange>,
pub compact_below_lsn: Option<Lsn>,
/// Whether the compaction job should be scheduled.
#[serde(default)]
pub scheduled: bool,
/// Whether the compaction job should be split across key ranges.
#[serde(default)]
pub sub_compaction: bool,
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct CompactRange {
@@ -799,27 +786,10 @@ pub(crate) struct CompactRange {
pub end: Key,
}
impl From<Range<Key>> for CompactRange {
fn from(range: Range<Key>) -> Self {
CompactRange {
start: range.start,
end: range.end,
}
}
}
#[derive(Debug, Clone, Default)]
#[derive(Clone, Default)]
pub(crate) struct CompactOptions {
pub flags: EnumSet<CompactFlags>,
/// If set, the compaction will only compact the key range specified by this option.
/// This option is only used by GC compaction.
pub compact_range: Option<CompactRange>,
/// If set, the compaction will only compact the LSN below this value.
/// This option is only used by GC compaction.
pub compact_below_lsn: Option<Lsn>,
/// Enable sub-compaction (split compaction job across key ranges).
/// This option is only used by GC compaction.
pub sub_compaction: bool,
}
impl std::fmt::Debug for Timeline {
@@ -1463,31 +1433,23 @@ impl Timeline {
Ok(lease)
}
/// Freeze the current open in-memory layer. It will be written to disk on next iteration.
/// Returns the flush request ID which can be awaited with wait_flush_completion().
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
pub(crate) async fn freeze(&self) -> Result<u64, FlushLayerError> {
self.freeze0().await
}
/// Freeze and flush the open in-memory layer, waiting for it to be written to disk.
/// Flush to disk all data that was written with the put_* functions
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
pub(crate) async fn freeze_and_flush(&self) -> Result<(), FlushLayerError> {
self.freeze_and_flush0().await
}
/// Freeze the current open in-memory layer. It will be written to disk on next iteration.
/// Returns the flush request ID which can be awaited with wait_flush_completion().
pub(crate) async fn freeze0(&self) -> Result<u64, FlushLayerError> {
let mut g = self.write_lock.lock().await;
let to_lsn = self.get_last_record_lsn();
self.freeze_inmem_layer_at(to_lsn, &mut g).await
}
// This exists to provide a non-span creating version of `freeze_and_flush` we can call without
// polluting the span hierarchy.
pub(crate) async fn freeze_and_flush0(&self) -> Result<(), FlushLayerError> {
let token = self.freeze0().await?;
let token = {
// Freeze the current open in-memory layer. It will be written to disk on next
// iteration.
let mut g = self.write_lock.lock().await;
let to_lsn = self.get_last_record_lsn();
self.freeze_inmem_layer_at(to_lsn, &mut g).await?
};
self.wait_flush_completion(token).await
}
@@ -1642,8 +1604,6 @@ impl Timeline {
CompactOptions {
flags,
compact_range: None,
compact_below_lsn: None,
sub_compaction: false,
},
ctx,
)
@@ -2399,7 +2359,7 @@ impl Timeline {
result
.metrics
.last_record_lsn_gauge
.last_record_gauge
.set(disk_consistent_lsn.0 as i64);
result
})
@@ -3521,7 +3481,7 @@ impl Timeline {
pub(crate) fn finish_write(&self, new_lsn: Lsn) {
assert!(new_lsn.is_aligned());
self.metrics.last_record_lsn_gauge.set(new_lsn.0 as i64);
self.metrics.last_record_gauge.set(new_lsn.0 as i64);
self.last_record_lsn.advance(new_lsn);
}
@@ -3889,10 +3849,6 @@ impl Timeline {
fn set_disk_consistent_lsn(&self, new_value: Lsn) -> bool {
let old_value = self.disk_consistent_lsn.fetch_max(new_value);
assert!(new_value >= old_value, "disk_consistent_lsn must be growing monotonously at runtime; current {old_value}, offered {new_value}");
self.metrics
.disk_consistent_lsn_gauge
.set(new_value.0 as i64);
new_value != old_value
}
@@ -5931,23 +5887,6 @@ impl<'a> TimelineWriter<'a> {
return Ok(());
}
// In debug builds, assert that we don't write any keys that don't belong to this shard.
// We don't assert this in release builds, since key ownership policies may change over
// time. Stray keys will be removed during compaction.
if cfg!(debug_assertions) {
for metadata in &batch.metadata {
if let ValueMeta::Serialized(metadata) = metadata {
let key = Key::from_compact(metadata.key);
assert!(
self.shard_identity.is_key_local(&key)
|| self.shard_identity.is_key_global(&key),
"key {key} does not belong on shard {}",
self.shard_identity.shard_index()
);
}
}
}
let batch_max_lsn = batch.max_lsn;
let buf_size: u64 = batch.buffer_size() as u64;

View File

@@ -10,12 +10,13 @@ use std::sync::Arc;
use super::layer_manager::LayerManager;
use super::{
CompactFlags, CompactOptions, CompactRange, CreateImageLayersError, DurationRecorder,
ImageLayerCreationMode, RecordedDuration, Timeline,
CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
RecordedDuration, Timeline,
};
use anyhow::{anyhow, bail, Context};
use bytes::Bytes;
use enumset::EnumSet;
use fail::fail_point;
use itertools::Itertools;
use pageserver_api::key::KEY_SIZE;
@@ -29,6 +30,7 @@ use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::batch_split_writer::{
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
@@ -41,7 +43,7 @@ use crate::tenant::storage_layer::{
use crate::tenant::timeline::ImageLayerCreationOutcome;
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Layer, ResidentLayer};
use crate::tenant::{gc_block, DeltaLayer, MaybeOffloaded};
use crate::tenant::{DeltaLayer, MaybeOffloaded};
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
use pageserver_api::config::tenant_conf_defaults::{
DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
@@ -62,15 +64,6 @@ use super::CompactionError;
/// Maximum number of deltas before generating an image layer in bottom-most compaction.
const COMPACTION_DELTA_THRESHOLD: usize = 5;
/// A scheduled compaction task.
pub(crate) struct ScheduledCompactionTask {
pub options: CompactOptions,
/// The channel to send the compaction result. If this is a subcompaction, the last compaction job holds the sender.
pub result_tx: Option<tokio::sync::oneshot::Sender<()>>,
/// Hold the GC block. If this is a subcompaction, the last compaction job holds the gc block guard.
pub gc_block: Option<gc_block::Guard>,
}
pub struct GcCompactionJobDescription {
/// All layers to read in the compaction job
selected_layers: Vec<Layer>,
@@ -1181,12 +1174,11 @@ impl Timeline {
.await
.map_err(CompactionError::Other)?;
} else {
let shard = self.shard_identity.shard_index();
let owner = self.shard_identity.get_shard_number(&key);
if cfg!(debug_assertions) {
panic!("key {key} does not belong on shard {shard}, owned by {owner}");
}
debug!("dropping key {key} during compaction (it belongs on shard {owner})");
debug!(
"Dropping key {} during compaction (it belongs on shard {:?})",
key,
self.shard_identity.get_shard_number(&key)
);
}
if !new_layers.is_empty() {
@@ -1754,113 +1746,22 @@ impl Timeline {
Ok(())
}
/// Split a gc-compaction job into multiple compaction jobs. Optimally, this function should return a vector of
/// `GcCompactionJobDesc`. But we want to keep it simple on the tenant scheduling side without exposing too much
/// ad-hoc information about gc compaction itself.
pub(crate) async fn gc_compaction_split_jobs(
pub(crate) async fn compact_with_gc(
self: &Arc<Self>,
cancel: &CancellationToken,
options: CompactOptions,
) -> anyhow::Result<Vec<CompactOptions>> {
if !options.sub_compaction {
return Ok(vec![options]);
}
let compact_range = options.compact_range.clone().unwrap_or(CompactRange {
start: Key::MIN,
end: Key::MAX,
});
let compact_below_lsn = if let Some(compact_below_lsn) = options.compact_below_lsn {
compact_below_lsn
} else {
*self.get_latest_gc_cutoff_lsn() // use the real gc cutoff
};
let mut compact_jobs = Vec::new();
// For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
// by estimating the amount of files read for a compaction job. We should also partition on LSN.
let Ok(partition) = self.partitioning.try_lock() else {
bail!("failed to acquire partition lock");
};
let ((dense_ks, sparse_ks), _) = &*partition;
// Truncate the key range to be within user specified compaction range.
fn truncate_to(
source_start: &Key,
source_end: &Key,
target_start: &Key,
target_end: &Key,
) -> Option<(Key, Key)> {
let start = source_start.max(target_start);
let end = source_end.min(target_end);
if start < end {
Some((*start, *end))
} else {
None
}
}
let mut split_key_ranges = Vec::new();
let ranges = dense_ks
.parts
.iter()
.map(|partition| partition.ranges.iter())
.chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
.flatten()
.cloned()
.collect_vec();
for range in ranges.iter() {
let Some((start, end)) = truncate_to(
&range.start,
&range.end,
&compact_range.start,
&compact_range.end,
) else {
continue;
};
split_key_ranges.push((start, end));
}
split_key_ranges.sort();
let guard = self.layers.read().await;
let layer_map = guard.layer_map()?;
let mut current_start = None;
// Split compaction job to about 2GB each
const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024; // 4GB, TODO: should be configuration in the future
let ranges_num = split_key_ranges.len();
for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
if current_start.is_none() {
current_start = Some(start);
}
let start = current_start.unwrap();
if start >= end {
// We have already processed this partition.
continue;
}
let res = layer_map.range_search(start..end, compact_below_lsn);
let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::<u64>();
if total_size > GC_COMPACT_MAX_SIZE_MB * 1024 * 1024 || ranges_num == idx + 1 {
let mut compact_options = options.clone();
// Try to extend the compaction range so that we include at least one full layer file.
let extended_end = res
.found
.keys()
.map(|layer| layer.layer.key_range.end)
.min();
// It is possible that the search range does not contain any layer files when we reach the end of the loop.
// In this case, we simply use the specified key range end.
let end = if let Some(extended_end) = extended_end {
extended_end.max(end)
} else {
end
};
info!(
"splitting compaction job: {}..{}, estimated_size={}",
start, end, total_size
);
compact_options.compact_range = Some(CompactRange { start, end });
compact_options.compact_below_lsn = Some(compact_below_lsn);
compact_options.sub_compaction = false;
compact_jobs.push(compact_options);
current_start = Some(end);
}
}
drop(guard);
Ok(compact_jobs)
ctx: &RequestContext,
) -> anyhow::Result<()> {
self.partial_compact_with_gc(
options
.compact_range
.map(|range| range.start..range.end)
.unwrap_or_else(|| Key::MIN..Key::MAX),
cancel,
options.flags,
ctx,
)
.await
}
/// An experimental compaction building block that combines compaction with garbage collection.
@@ -1870,51 +1771,19 @@ impl Timeline {
/// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
/// and create delta layers with all deltas >= gc horizon.
///
/// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction.
/// If `key_range` is provided, it will only compact the keys within the range, aka partial compaction.
/// Partial compaction will read and process all layers overlapping with the key range, even if it might
/// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
/// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
/// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
/// part of the range.
///
/// If `options.compact_below_lsn` is provided, the compaction will only compact layers below or intersect with
/// the LSN. Otherwise, it will use the gc cutoff by default.
pub(crate) async fn compact_with_gc(
pub(crate) async fn partial_compact_with_gc(
self: &Arc<Self>,
compaction_key_range: Range<Key>,
cancel: &CancellationToken,
options: CompactOptions,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
if options.sub_compaction {
info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
let jobs = self.gc_compaction_split_jobs(options).await?;
let jobs_len = jobs.len();
for (idx, job) in jobs.into_iter().enumerate() {
info!(
"running enhanced gc bottom-most compaction, sub-compaction {}/{}",
idx + 1,
jobs_len
);
self.compact_with_gc_inner(cancel, job, ctx).await?;
}
if jobs_len == 0 {
info!("no jobs to run, skipping gc bottom-most compaction");
}
return Ok(());
}
self.compact_with_gc_inner(cancel, options, ctx).await
}
async fn compact_with_gc_inner(
self: &Arc<Self>,
cancel: &CancellationToken,
options: CompactOptions,
ctx: &RequestContext,
) -> anyhow::Result<()> {
assert!(
!options.sub_compaction,
"sub-compaction should be handled by the outer function"
);
// Block other compaction/GC tasks from running for now. GC-compaction could run along
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
// Note that we already acquired the compaction lock when the outer `compact` function gets called.
@@ -1934,12 +1803,6 @@ impl Timeline {
)
.await?;
let flags = options.flags;
let compaction_key_range = options
.compact_range
.map(|range| range.start..range.end)
.unwrap_or_else(|| Key::MIN..Key::MAX);
let dry_run = flags.contains(CompactFlags::DryRun);
if compaction_key_range == (Key::MIN..Key::MAX) {
@@ -1963,22 +1826,7 @@ impl Timeline {
let layers = guard.layer_map()?;
let gc_info = self.gc_info.read().unwrap();
let mut retain_lsns_below_horizon = Vec::new();
let gc_cutoff = {
// Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
// Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
// cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
// to get the truth data.
let real_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
// The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
// each of the retain_lsn. Therefore, if the user-provided `compact_below_lsn` is larger than the real gc cutoff, we will use
// the real cutoff.
let mut gc_cutoff = options.compact_below_lsn.unwrap_or(real_gc_cutoff);
if gc_cutoff > real_gc_cutoff {
warn!("provided compact_below_lsn={} is larger than the real_gc_cutoff={}, using the real gc cutoff", gc_cutoff, real_gc_cutoff);
gc_cutoff = real_gc_cutoff;
}
gc_cutoff
};
let gc_cutoff = gc_info.cutoffs.select_min();
for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
if lsn < &gc_cutoff {
retain_lsns_below_horizon.push(*lsn);
@@ -1998,7 +1846,7 @@ impl Timeline {
.map(|desc| desc.get_lsn_range().end)
.max()
else {
info!("no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}", gc_cutoff);
info!("no layers to compact with gc");
return Ok(());
};
// Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
@@ -2021,7 +1869,7 @@ impl Timeline {
}
}
if selected_layers.is_empty() {
info!("no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}", gc_cutoff, compaction_key_range.start, compaction_key_range.end);
info!("no layers to compact with gc");
return Ok(());
}
retain_lsns_below_horizon.sort();
@@ -2088,15 +1936,14 @@ impl Timeline {
// Step 1: construct a k-merge iterator over all layers.
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
// disable the check for now because we need to adjust the check for partial compactions, will enable later.
// let layer_names = job_desc
// .selected_layers
// .iter()
// .map(|layer| layer.layer_desc().layer_name())
// .collect_vec();
// if let Some(err) = check_valid_layermap(&layer_names) {
// warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
// }
let layer_names = job_desc
.selected_layers
.iter()
.map(|layer| layer.layer_desc().layer_name())
.collect_vec();
if let Some(err) = check_valid_layermap(&layer_names) {
warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
}
// The maximum LSN we are processing in this compaction loop
let end_lsn = job_desc
.selected_layers
@@ -2201,11 +2048,6 @@ impl Timeline {
// This is not handled in the filter iterator because shard is determined by hash.
// Therefore, it does not give us any performance benefit to do things like skip
// a whole layer file as handling key spaces (ranges).
if cfg!(debug_assertions) {
let shard = self.shard_identity.shard_index();
let owner = self.shard_identity.get_shard_number(&key);
panic!("key {key} does not belong on shard {shard}, owned by {owner}");
}
continue;
}
if !job_desc.compaction_key_range.contains(&key) {

View File

@@ -369,13 +369,6 @@ pub(super) async fn handle_walreceiver_connection(
// advances it to its end LSN. 0 is just an initialization placeholder.
let mut modification = timeline.begin_modification(Lsn(0));
if !records.is_empty() {
timeline
.metrics
.wal_records_received
.inc_by(records.len() as u64);
}
for interpreted in records {
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
&& uncommitted_records > 0
@@ -517,7 +510,6 @@ pub(super) async fn handle_walreceiver_connection(
}
// Ingest the records without immediately committing them.
timeline.metrics.wal_records_received.inc();
let ingested = walingest
.ingest_record(interpreted, &mut modification, &ctx)
.await

View File

@@ -132,7 +132,6 @@ where
.expect("must not use after we returned an error")
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
pub async fn write_buffered_borrowed(
&mut self,
chunk: &[u8],

View File

@@ -582,21 +582,18 @@ impl WalIngest {
forknum: FSM_FORKNUM,
};
// Zero out the last remaining FSM page, if this shard owns it. We are not precise here,
// and instead of digging in the FSM bitmap format we just clear the whole page.
let fsm_logical_page_no = blkno / pg_constants::SLOTS_PER_FSM_PAGE;
let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
if blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0
&& self
.shard
.is_key_local(&rel_block_to_key(rel, fsm_physical_page_no))
{
if blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
// Tail of last remaining FSM page has to be zeroed.
// We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
fsm_physical_page_no += 1;
}
// Truncate this shard's view of the FSM relation size, if it even has one.
// TODO: re-examine the None case here wrt. sharding; should we error?
let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
if nblocks > fsm_physical_page_no {
// check if something to do: FSM is larger than truncate position
self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
.await?;
}
@@ -620,7 +617,7 @@ impl WalIngest {
// tail bits in the last remaining map page, representing truncated heap
// blocks, need to be cleared. This is not only tidy, but also necessary
// because we don't get a chance to clear the bits if the heap is extended
// again. Only do this on the shard that owns the page.
// again.
if (trunc_byte != 0 || trunc_offs != 0)
&& self.shard.is_key_local(&rel_block_to_key(rel, vm_page_no))
{
@@ -634,9 +631,10 @@ impl WalIngest {
)?;
vm_page_no += 1;
}
// Truncate this shard's view of the VM relation size, if it even has one.
// TODO: re-examine the None case here wrt. sharding; should we error?
let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
if nblocks > vm_page_no {
// check if something to do: VM is larger than truncate position
self.put_rel_truncation(modification, rel, vm_page_no, ctx)
.await?;
}

View File

@@ -22,7 +22,6 @@
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "portability/instr_time.h"
#include "postmaster/interrupt.h"
#include "storage/buf_internals.h"
#include "storage/ipc.h"
@@ -119,11 +118,6 @@ typedef struct
*/
PSConnectionState state;
PGconn *conn;
/* request / response counters for debugging */
uint64 nrequests_sent;
uint64 nresponses_received;
/*---
* WaitEventSet containing:
* - WL_SOCKET_READABLE on 'conn'
@@ -634,8 +628,6 @@ pageserver_connect(shardno_t shard_no, int elevel)
}
shard->state = PS_Connected;
shard->nrequests_sent = 0;
shard->nresponses_received = 0;
}
/* FALLTHROUGH */
case PS_Connected:
@@ -664,27 +656,6 @@ call_PQgetCopyData(shardno_t shard_no, char **buffer)
int ret;
PageServer *shard = &page_servers[shard_no];
PGconn *pageserver_conn = shard->conn;
instr_time now,
start_ts,
since_start,
last_log_ts,
since_last_log;
bool logged = false;
/*
* As a debugging aid, if we don't get a response for a long time, print a
* log message.
*
* 10 s is a very generous threshold, normally we expect a response in a
* few milliseconds. We have metrics to track latencies in normal ranges,
* but in the cases that take exceptionally long, it's useful to log the
* exact timestamps.
*/
#define LOG_INTERVAL_US UINT64CONST(10 * 1000000)
INSTR_TIME_SET_CURRENT(now);
start_ts = last_log_ts = now;
INSTR_TIME_SET_ZERO(since_last_log);
retry:
ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
@@ -692,12 +663,9 @@ retry:
if (ret == 0)
{
WaitEvent event;
long timeout;
timeout = Min(0, LOG_INTERVAL_US - INSTR_TIME_GET_MICROSEC(since_last_log));
/* Sleep until there's something to do */
(void) WaitEventSetWait(shard->wes_read, timeout, &event, 1,
(void) WaitEventSetWait(shard->wes_read, -1L, &event, 1,
WAIT_EVENT_NEON_PS_READ);
ResetLatch(MyLatch);
@@ -716,40 +684,9 @@ retry:
}
}
/*
* Print a message to the log if a long time has passed with no
* response.
*/
INSTR_TIME_SET_CURRENT(now);
since_last_log = now;
INSTR_TIME_SUBTRACT(since_last_log, last_log_ts);
if (INSTR_TIME_GET_MICROSEC(since_last_log) >= LOG_INTERVAL_US)
{
since_start = now;
INSTR_TIME_SUBTRACT(since_start, start_ts);
neon_shard_log(shard_no, LOG, "no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses)",
INSTR_TIME_GET_DOUBLE(since_start),
shard->nrequests_sent, shard->nresponses_received);
last_log_ts = now;
logged = true;
}
goto retry;
}
/*
* If we logged earlier that the response is taking a long time, log
* another message when the response is finally received.
*/
if (logged)
{
INSTR_TIME_SET_CURRENT(now);
since_start = now;
INSTR_TIME_SUBTRACT(since_start, start_ts);
neon_shard_log(shard_no, LOG, "received response from pageserver after %0.3f s",
INSTR_TIME_GET_DOUBLE(since_start));
}
return ret;
}
@@ -849,7 +786,6 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
* PGRES_POLLING_WRITING state. It's kinda dirty to disconnect at this
* point, but on the grand scheme of things it's only a small issue.
*/
shard->nrequests_sent++;
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
@@ -942,7 +878,6 @@ pageserver_receive(shardno_t shard_no)
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc);
}
shard->nresponses_received++;
return (NeonResponse *) resp;
}

View File

@@ -423,11 +423,7 @@ readahead_buffer_resize(int newsize, void *extra)
* ensuring we have received all but the last n requests (n = newsize).
*/
if (MyPState->n_requests_inflight > newsize)
{
Assert(MyPState->ring_unused >= MyPState->n_requests_inflight - newsize);
prefetch_wait_for(MyPState->ring_unused - (MyPState->n_requests_inflight - newsize));
Assert(MyPState->n_requests_inflight <= newsize);
}
prefetch_wait_for(MyPState->ring_unused - newsize);
/* construct the new PrefetchState, and copy over the memory contexts */
newPState = MemoryContextAllocZero(TopMemoryContext, newprfs_size);
@@ -442,6 +438,7 @@ readahead_buffer_resize(int newsize, void *extra)
newPState->ring_last = newsize;
newPState->ring_unused = newsize;
newPState->ring_receive = newsize;
newPState->ring_flush = newsize;
newPState->max_shard_no = MyPState->max_shard_no;
memcpy(newPState->shard_bitmap, MyPState->shard_bitmap, sizeof(MyPState->shard_bitmap));
@@ -492,7 +489,6 @@ readahead_buffer_resize(int newsize, void *extra)
}
newPState->n_unused -= 1;
}
newPState->ring_flush = newPState->ring_receive;
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
@@ -502,7 +498,6 @@ readahead_buffer_resize(int newsize, void *extra)
for (; end >= MyPState->ring_last && end != UINT64_MAX; end -= 1)
{
PrefetchRequest *slot = GetPrfSlot(end);
Assert(slot->status != PRFS_REQUESTED);
if (slot->status == PRFS_RECEIVED)
{
pfree(slot->response);
@@ -615,9 +610,6 @@ prefetch_read(PrefetchRequest *slot)
{
NeonResponse *response;
MemoryContext old;
BufferTag buftag;
shardno_t shard_no;
uint64 my_ring_index;
Assert(slot->status == PRFS_REQUESTED);
Assert(slot->response == NULL);
@@ -631,29 +623,11 @@ prefetch_read(PrefetchRequest *slot)
slot->status, slot->response,
(long)slot->my_ring_index, (long)MyPState->ring_receive);
/*
* Copy the request info so that if an error happens and the prefetch
* queue is flushed during the receive call, we can print the original
* values in the error message
*/
buftag = slot->buftag;
shard_no = slot->shard_no;
my_ring_index = slot->my_ring_index;
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive(shard_no);
response = (NeonResponse *) page_server->receive(slot->shard_no);
MemoryContextSwitchTo(old);
if (response)
{
/* The slot should still be valid */
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
neon_shard_log(shard_no, ERROR,
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long) slot->my_ring_index, (long) MyPState->ring_receive);
/* update prefetch state */
MyPState->n_responses_buffered += 1;
MyPState->n_requests_inflight -= 1;
@@ -668,15 +642,11 @@ prefetch_read(PrefetchRequest *slot)
}
else
{
/*
* Note: The slot might no longer be valid, if the connection was lost
* and the prefetch queue was flushed during the receive call
*/
neon_shard_log(shard_no, LOG,
neon_shard_log(slot->shard_no, LOG,
"No response from reading prefetch entry %lu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
(long) my_ring_index,
RelFileInfoFmt(BufTagGetNRelFileInfo(buftag)),
buftag.forkNum, buftag.blockNum);
(long)slot->my_ring_index,
RelFileInfoFmt(BufTagGetNRelFileInfo(slot->buftag)),
slot->buftag.forkNum, slot->buftag.blockNum);
return false;
}
}

View File

@@ -70,10 +70,6 @@ impl std::fmt::Display for Backend<'_, ()> {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ControlPlane(api, ()) => match &**api {
ControlPlaneClient::ProxyV1(endpoint) => fmt
.debug_tuple("ControlPlane::ProxyV1")
.field(&endpoint.url())
.finish(),
ControlPlaneClient::Neon(endpoint) => fmt
.debug_tuple("ControlPlane::Neon")
.field(&endpoint.url())

View File

@@ -46,9 +46,6 @@ enum AuthBackendType {
#[value(name("console"), alias("cplane"))]
ControlPlane,
#[value(name("cplane-v1"), alias("control-plane"))]
ControlPlaneV1,
#[value(name("link"), alias("control-redirect"))]
ConsoleRedirect,
@@ -521,39 +518,6 @@ async fn main() -> anyhow::Result<()> {
.instrument(span),
);
}
} else if let proxy::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api {
match (redis_notifications_client, regional_redis_client.clone()) {
(None, None) => {}
(client1, client2) => {
let cache = api.caches.project_info.clone();
if let Some(client) = client1 {
maintenance_tasks.spawn(notifications::task_main(
client,
cache.clone(),
cancel_map.clone(),
args.region.clone(),
));
}
if let Some(client) = client2 {
maintenance_tasks.spawn(notifications::task_main(
client,
cache.clone(),
cancel_map.clone(),
args.region.clone(),
));
}
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
}
}
if let Some(regional_redis_client) = regional_redis_client {
let cache = api.caches.endpoints_cache.clone();
let con = regional_redis_client;
let span = tracing::info_span!("endpoints_cache");
maintenance_tasks.spawn(
async move { cache.do_read(con, cancellation_token.clone()).await }
.instrument(span),
);
}
}
}
@@ -698,65 +662,6 @@ fn build_auth_backend(
args: &ProxyCliArgs,
) -> anyhow::Result<Either<&'static auth::Backend<'static, ()>, &'static ConsoleRedirectBackend>> {
match &args.auth_backend {
AuthBackendType::ControlPlaneV1 => {
let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?;
let project_info_cache_config: ProjectInfoCacheOptions =
args.project_info_cache.parse()?;
let endpoint_cache_config: config::EndpointCacheConfig =
args.endpoint_cache_config.parse()?;
info!("Using NodeInfoCache (wake_compute) with options={wake_compute_cache_config:?}");
info!(
"Using AllowedIpsCache (wake_compute) with options={project_info_cache_config:?}"
);
info!("Using EndpointCacheConfig with options={endpoint_cache_config:?}");
let caches = Box::leak(Box::new(control_plane::caches::ApiCaches::new(
wake_compute_cache_config,
project_info_cache_config,
endpoint_cache_config,
)));
let config::ConcurrencyLockOptions {
shards,
limiter,
epoch,
timeout,
} = args.wake_compute_lock.parse()?;
info!(?limiter, shards, ?epoch, "Using NodeLocks (wake_compute)");
let locks = Box::leak(Box::new(control_plane::locks::ApiLocks::new(
"wake_compute_lock",
limiter,
shards,
timeout,
epoch,
&Metrics::get().wake_compute_lock,
)?));
tokio::spawn(locks.garbage_collect_worker());
let url: proxy::url::ApiUrl = args.auth_endpoint.parse()?;
let endpoint = http::Endpoint::new(url, http::new_client());
let mut wake_compute_rps_limit = args.wake_compute_limit.clone();
RateBucketInfo::validate(&mut wake_compute_rps_limit)?;
let wake_compute_endpoint_rate_limiter =
Arc::new(WakeComputeRateLimiter::new(wake_compute_rps_limit));
let api = control_plane::client::cplane_proxy_v1::NeonControlPlaneClient::new(
endpoint,
args.control_plane_token.clone(),
caches,
locks,
wake_compute_endpoint_rate_limiter,
);
let api = control_plane::client::ControlPlaneClient::ProxyV1(api);
let auth_backend = auth::Backend::ControlPlane(MaybeOwned::Owned(api), ());
let config = Box::leak(Box::new(auth_backend));
Ok(Either::Left(config))
}
AuthBackendType::ControlPlane => {
let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?;
let project_info_cache_config: ProjectInfoCacheOptions =
@@ -792,15 +697,13 @@ fn build_auth_backend(
)?));
tokio::spawn(locks.garbage_collect_worker());
let url: proxy::url::ApiUrl = args.auth_endpoint.parse()?;
let url = args.auth_endpoint.parse()?;
let endpoint = http::Endpoint::new(url, http::new_client());
let mut wake_compute_rps_limit = args.wake_compute_limit.clone();
RateBucketInfo::validate(&mut wake_compute_rps_limit)?;
let wake_compute_endpoint_rate_limiter =
Arc::new(WakeComputeRateLimiter::new(wake_compute_rps_limit));
let api = control_plane::client::neon::NeonControlPlaneClient::new(
endpoint,
args.control_plane_token.clone(),

View File

@@ -115,8 +115,7 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
IpAddr::V6(ip) => IpNet::V6(Ipv6Net::new_assert(ip, 64).trunc()),
};
if !self.limiter.lock().unwrap().check(subnet_key, 1) {
// log only the subnet part of the IP address to know which subnet is rate limited
tracing::warn!("Rate limit exceeded. Skipping cancellation message, {subnet_key}");
tracing::debug!("Rate limit exceeded. Skipping cancellation message");
Metrics::get()
.proxy
.cancellation_requests_total

View File

@@ -163,36 +163,32 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
let do_handshake = handshake(ctx, stream, tls, record_handshake_error);
let (mut stream, params) = match tokio::time::timeout(config.handshake_timeout, do_handshake)
.await??
{
HandshakeData::Startup(stream, params) => (stream, params),
HandshakeData::Cancel(cancel_key_data) => {
// spawn a task to cancel the session, but don't wait for it
cancellations.spawn({
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
let session_id = ctx.session_id();
let peer_ip = ctx.peer_addr();
let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?session_id);
cancel_span.follows_from(tracing::Span::current());
async move {
drop(
cancellation_handler_clone
.cancel_session(
cancel_key_data,
session_id,
peer_ip,
config.authentication_config.ip_allowlist_check_enabled,
)
.instrument(cancel_span)
.await,
);
}
});
let (mut stream, params) =
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
HandshakeData::Startup(stream, params) => (stream, params),
HandshakeData::Cancel(cancel_key_data) => {
// spawn a task to cancel the session, but don't wait for it
cancellations.spawn({
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
let session_id = ctx.session_id();
let peer_ip = ctx.peer_addr();
async move {
drop(
cancellation_handler_clone
.cancel_session(
cancel_key_data,
session_id,
peer_ip,
config.authentication_config.ip_allowlist_check_enabled,
)
.await,
);
}
});
return Ok(None);
}
};
return Ok(None);
}
};
drop(pause);
ctx.set_db_options(params.clone());

View File

@@ -1,514 +0,0 @@
//! Production console backend.
use std::sync::Arc;
use std::time::Duration;
use ::http::header::AUTHORIZATION;
use ::http::HeaderName;
use futures::TryFutureExt;
use postgres_client::config::SslMode;
use tokio::time::Instant;
use tracing::{debug, info, info_span, warn, Instrument};
use super::super::messages::{ControlPlaneErrorMessage, GetEndpointAccessControl, WakeCompute};
use crate::auth::backend::jwt::AuthRule;
use crate::auth::backend::ComputeUserInfo;
use crate::cache::Cached;
use crate::context::RequestContext;
use crate::control_plane::caches::ApiCaches;
use crate::control_plane::errors::{
ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError,
};
use crate::control_plane::locks::ApiLocks;
use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, Reason};
use crate::control_plane::{
AuthInfo, AuthSecret, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret, NodeInfo,
};
use crate::metrics::{CacheOutcome, Metrics};
use crate::rate_limiter::WakeComputeRateLimiter;
use crate::types::{EndpointCacheKey, EndpointId};
use crate::{compute, http, scram};
const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");
#[derive(Clone)]
pub struct NeonControlPlaneClient {
endpoint: http::Endpoint,
pub caches: &'static ApiCaches,
pub(crate) locks: &'static ApiLocks<EndpointCacheKey>,
pub(crate) wake_compute_endpoint_rate_limiter: Arc<WakeComputeRateLimiter>,
// put in a shared ref so we don't copy secrets all over in memory
jwt: Arc<str>,
}
impl NeonControlPlaneClient {
/// Construct an API object containing the auth parameters.
pub fn new(
endpoint: http::Endpoint,
jwt: Arc<str>,
caches: &'static ApiCaches,
locks: &'static ApiLocks<EndpointCacheKey>,
wake_compute_endpoint_rate_limiter: Arc<WakeComputeRateLimiter>,
) -> Self {
Self {
endpoint,
caches,
locks,
wake_compute_endpoint_rate_limiter,
jwt,
}
}
pub(crate) fn url(&self) -> &str {
self.endpoint.url().as_str()
}
async fn do_get_auth_info(
&self,
ctx: &RequestContext,
user_info: &ComputeUserInfo,
) -> Result<AuthInfo, GetAuthInfoError> {
if !self
.caches
.endpoints_cache
.is_valid(ctx, &user_info.endpoint.normalize())
{
// TODO: refactor this because it's weird
// this is a failure to authenticate but we return Ok.
info!("endpoint is not valid, skipping the request");
return Ok(AuthInfo::default());
}
let request_id = ctx.session_id().to_string();
let application_name = ctx.console_application_name();
async {
let request = self
.endpoint
.get_path("get_endpoint_access_control")
.header(X_REQUEST_ID, &request_id)
.header(AUTHORIZATION, format!("Bearer {}", &self.jwt))
.query(&[("session_id", ctx.session_id())])
.query(&[
("application_name", application_name.as_str()),
("endpointish", user_info.endpoint.as_str()),
("role", user_info.user.as_str()),
])
.build()?;
debug!(url = request.url().as_str(), "sending http request");
let start = Instant::now();
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
let response = self.endpoint.execute(request).await?;
drop(pause);
info!(duration = ?start.elapsed(), "received http response");
let body = match parse_body::<GetEndpointAccessControl>(response).await {
Ok(body) => body,
// Error 404 is special: it's ok not to have a secret.
// TODO(anna): retry
Err(e) => {
return if e.get_reason().is_not_found() {
// TODO: refactor this because it's weird
// this is a failure to authenticate but we return Ok.
Ok(AuthInfo::default())
} else {
Err(e.into())
};
}
};
// Ivan: don't know where it will be used, so I leave it here
let _endpoint_vpc_ids = body.allowed_vpc_endpoint_ids.unwrap_or_default();
let secret = if body.role_secret.is_empty() {
None
} else {
let secret = scram::ServerSecret::parse(&body.role_secret)
.map(AuthSecret::Scram)
.ok_or(GetAuthInfoError::BadSecret)?;
Some(secret)
};
let allowed_ips = body.allowed_ips.unwrap_or_default();
Metrics::get()
.proxy
.allowed_ips_number
.observe(allowed_ips.len() as f64);
Ok(AuthInfo {
secret,
allowed_ips,
project_id: body.project_id,
})
}
.inspect_err(|e| tracing::debug!(error = ?e))
.instrument(info_span!("do_get_auth_info"))
.await
}
async fn do_get_endpoint_jwks(
&self,
ctx: &RequestContext,
endpoint: EndpointId,
) -> Result<Vec<AuthRule>, GetEndpointJwksError> {
if !self
.caches
.endpoints_cache
.is_valid(ctx, &endpoint.normalize())
{
return Err(GetEndpointJwksError::EndpointNotFound);
}
let request_id = ctx.session_id().to_string();
async {
let request = self
.endpoint
.get_with_url(|url| {
url.path_segments_mut()
.push("endpoints")
.push(endpoint.as_str())
.push("jwks");
})
.header(X_REQUEST_ID, &request_id)
.header(AUTHORIZATION, format!("Bearer {}", &self.jwt))
.query(&[("session_id", ctx.session_id())])
.build()
.map_err(GetEndpointJwksError::RequestBuild)?;
debug!(url = request.url().as_str(), "sending http request");
let start = Instant::now();
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
let response = self
.endpoint
.execute(request)
.await
.map_err(GetEndpointJwksError::RequestExecute)?;
drop(pause);
info!(duration = ?start.elapsed(), "received http response");
let body = parse_body::<EndpointJwksResponse>(response).await?;
let rules = body
.jwks
.into_iter()
.map(|jwks| AuthRule {
id: jwks.id,
jwks_url: jwks.jwks_url,
audience: jwks.jwt_audience,
role_names: jwks.role_names,
})
.collect();
Ok(rules)
}
.inspect_err(|e| tracing::debug!(error = ?e))
.instrument(info_span!("do_get_endpoint_jwks"))
.await
}
async fn do_wake_compute(
&self,
ctx: &RequestContext,
user_info: &ComputeUserInfo,
) -> Result<NodeInfo, WakeComputeError> {
let request_id = ctx.session_id().to_string();
let application_name = ctx.console_application_name();
async {
let mut request_builder = self
.endpoint
.get_path("wake_compute")
.header("X-Request-ID", &request_id)
.header("Authorization", format!("Bearer {}", &self.jwt))
.query(&[("session_id", ctx.session_id())])
.query(&[
("application_name", application_name.as_str()),
("endpointish", user_info.endpoint.as_str()),
]);
let options = user_info.options.to_deep_object();
if !options.is_empty() {
request_builder = request_builder.query(&options);
}
let request = request_builder.build()?;
debug!(url = request.url().as_str(), "sending http request");
let start = Instant::now();
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
let response = self.endpoint.execute(request).await?;
drop(pause);
info!(duration = ?start.elapsed(), "received http response");
let body = parse_body::<WakeCompute>(response).await?;
// Unfortunately, ownership won't let us use `Option::ok_or` here.
let (host, port) = match parse_host_port(&body.address) {
None => return Err(WakeComputeError::BadComputeAddress(body.address)),
Some(x) => x,
};
// Don't set anything but host and port! This config will be cached.
// We'll set username and such later using the startup message.
// TODO: add more type safety (in progress).
let mut config = compute::ConnCfg::new(host.to_owned(), port);
config.ssl_mode(SslMode::Disable); // TLS is not configured on compute nodes.
let node = NodeInfo {
config,
aux: body.aux,
allow_self_signed_compute: false,
};
Ok(node)
}
.inspect_err(|e| tracing::debug!(error = ?e))
.instrument(info_span!("do_wake_compute"))
.await
}
}
impl super::ControlPlaneApi for NeonControlPlaneClient {
#[tracing::instrument(skip_all)]
async fn get_role_secret(
&self,
ctx: &RequestContext,
user_info: &ComputeUserInfo,
) -> Result<CachedRoleSecret, GetAuthInfoError> {
let normalized_ep = &user_info.endpoint.normalize();
let user = &user_info.user;
if let Some(role_secret) = self
.caches
.project_info
.get_role_secret(normalized_ep, user)
{
return Ok(role_secret);
}
let auth_info = self.do_get_auth_info(ctx, user_info).await?;
if let Some(project_id) = auth_info.project_id {
let normalized_ep_int = normalized_ep.into();
self.caches.project_info.insert_role_secret(
project_id,
normalized_ep_int,
user.into(),
auth_info.secret.clone(),
);
self.caches.project_info.insert_allowed_ips(
project_id,
normalized_ep_int,
Arc::new(auth_info.allowed_ips),
);
ctx.set_project_id(project_id);
}
// When we just got a secret, we don't need to invalidate it.
Ok(Cached::new_uncached(auth_info.secret))
}
async fn get_allowed_ips_and_secret(
&self,
ctx: &RequestContext,
user_info: &ComputeUserInfo,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> {
let normalized_ep = &user_info.endpoint.normalize();
if let Some(allowed_ips) = self.caches.project_info.get_allowed_ips(normalized_ep) {
Metrics::get()
.proxy
.allowed_ips_cache_misses
.inc(CacheOutcome::Hit);
return Ok((allowed_ips, None));
}
Metrics::get()
.proxy
.allowed_ips_cache_misses
.inc(CacheOutcome::Miss);
let auth_info = self.do_get_auth_info(ctx, user_info).await?;
let allowed_ips = Arc::new(auth_info.allowed_ips);
let user = &user_info.user;
if let Some(project_id) = auth_info.project_id {
let normalized_ep_int = normalized_ep.into();
self.caches.project_info.insert_role_secret(
project_id,
normalized_ep_int,
user.into(),
auth_info.secret.clone(),
);
self.caches.project_info.insert_allowed_ips(
project_id,
normalized_ep_int,
allowed_ips.clone(),
);
ctx.set_project_id(project_id);
}
Ok((
Cached::new_uncached(allowed_ips),
Some(Cached::new_uncached(auth_info.secret)),
))
}
#[tracing::instrument(skip_all)]
async fn get_endpoint_jwks(
&self,
ctx: &RequestContext,
endpoint: EndpointId,
) -> Result<Vec<AuthRule>, GetEndpointJwksError> {
self.do_get_endpoint_jwks(ctx, endpoint).await
}
#[tracing::instrument(skip_all)]
async fn wake_compute(
&self,
ctx: &RequestContext,
user_info: &ComputeUserInfo,
) -> Result<CachedNodeInfo, WakeComputeError> {
let key = user_info.endpoint_cache_key();
macro_rules! check_cache {
() => {
if let Some(cached) = self.caches.node_info.get(&key) {
let (cached, info) = cached.take_value();
let info = info.map_err(|c| {
info!(key = &*key, "found cached wake_compute error");
WakeComputeError::ControlPlane(ControlPlaneError::Message(Box::new(*c)))
})?;
debug!(key = &*key, "found cached compute node info");
ctx.set_project(info.aux.clone());
return Ok(cached.map(|()| info));
}
};
}
// Every time we do a wakeup http request, the compute node will stay up
// for some time (highly depends on the console's scale-to-zero policy);
// The connection info remains the same during that period of time,
// which means that we might cache it to reduce the load and latency.
check_cache!();
let permit = self.locks.get_permit(&key).await?;
// after getting back a permit - it's possible the cache was filled
// double check
if permit.should_check_cache() {
// TODO: if there is something in the cache, mark the permit as success.
check_cache!();
}
// check rate limit
if !self
.wake_compute_endpoint_rate_limiter
.check(user_info.endpoint.normalize_intern(), 1)
{
return Err(WakeComputeError::TooManyConnections);
}
let node = permit.release_result(self.do_wake_compute(ctx, user_info).await);
match node {
Ok(node) => {
ctx.set_project(node.aux.clone());
debug!(key = &*key, "created a cache entry for woken compute node");
let mut stored_node = node.clone();
// store the cached node as 'warm_cached'
stored_node.aux.cold_start_info = ColdStartInfo::WarmCached;
let (_, cached) = self.caches.node_info.insert_unit(key, Ok(stored_node));
Ok(cached.map(|()| node))
}
Err(err) => match err {
WakeComputeError::ControlPlane(ControlPlaneError::Message(err)) => {
let Some(status) = &err.status else {
return Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
err,
)));
};
let reason = status
.details
.error_info
.map_or(Reason::Unknown, |x| x.reason);
// if we can retry this error, do not cache it.
if reason.can_retry() {
return Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
err,
)));
}
// at this point, we should only have quota errors.
debug!(
key = &*key,
"created a cache entry for the wake compute error"
);
self.caches.node_info.insert_ttl(
key,
Err(err.clone()),
Duration::from_secs(30),
);
Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
err,
)))
}
err => return Err(err),
},
}
}
}
/// Parse http response body, taking status code into account.
async fn parse_body<T: for<'a> serde::Deserialize<'a>>(
response: http::Response,
) -> Result<T, ControlPlaneError> {
let status = response.status();
if status.is_success() {
// We shouldn't log raw body because it may contain secrets.
info!("request succeeded, processing the body");
return Ok(response.json().await?);
}
let s = response.bytes().await?;
// Log plaintext to be able to detect, whether there are some cases not covered by the error struct.
info!("response_error plaintext: {:?}", s);
// Don't throw an error here because it's not as important
// as the fact that the request itself has failed.
let mut body = serde_json::from_slice(&s).unwrap_or_else(|e| {
warn!("failed to parse error body: {e}");
ControlPlaneErrorMessage {
error: "reason unclear (malformed error message)".into(),
http_status_code: status,
status: None,
}
});
body.http_status_code = status;
warn!("console responded with an error ({status}): {body:?}");
Err(ControlPlaneError::Message(Box::new(body)))
}
fn parse_host_port(input: &str) -> Option<(&str, u16)> {
let (host, port) = input.rsplit_once(':')?;
let ipv6_brackets: &[_] = &['[', ']'];
Some((host.trim_matches(ipv6_brackets), port.parse().ok()?))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_host_port_v4() {
let (host, port) = parse_host_port("127.0.0.1:5432").expect("failed to parse");
assert_eq!(host, "127.0.0.1");
assert_eq!(port, 5432);
}
#[test]
fn test_parse_host_port_v6() {
let (host, port) = parse_host_port("[2001:db8::1]:5432").expect("failed to parse");
assert_eq!(host, "2001:db8::1");
assert_eq!(port, 5432);
}
#[test]
fn test_parse_host_port_url() {
let (host, port) = parse_host_port("compute-foo-bar-1234.default.svc.cluster.local:5432")
.expect("failed to parse");
assert_eq!(host, "compute-foo-bar-1234.default.svc.cluster.local");
assert_eq!(port, 5432);
}
}

View File

@@ -1,4 +1,3 @@
pub mod cplane_proxy_v1;
#[cfg(any(test, feature = "testing"))]
pub mod mock;
pub mod neon;
@@ -28,8 +27,6 @@ use crate::types::EndpointId;
#[non_exhaustive]
#[derive(Clone)]
pub enum ControlPlaneClient {
/// New Proxy V1 control plane API
ProxyV1(cplane_proxy_v1::NeonControlPlaneClient),
/// Current Management API (V2).
Neon(neon::NeonControlPlaneClient),
/// Local mock control plane.
@@ -48,7 +45,6 @@ impl ControlPlaneApi for ControlPlaneClient {
user_info: &ComputeUserInfo,
) -> Result<CachedRoleSecret, errors::GetAuthInfoError> {
match self {
Self::ProxyV1(api) => api.get_role_secret(ctx, user_info).await,
Self::Neon(api) => api.get_role_secret(ctx, user_info).await,
#[cfg(any(test, feature = "testing"))]
Self::PostgresMock(api) => api.get_role_secret(ctx, user_info).await,
@@ -65,7 +61,6 @@ impl ControlPlaneApi for ControlPlaneClient {
user_info: &ComputeUserInfo,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), errors::GetAuthInfoError> {
match self {
Self::ProxyV1(api) => api.get_allowed_ips_and_secret(ctx, user_info).await,
Self::Neon(api) => api.get_allowed_ips_and_secret(ctx, user_info).await,
#[cfg(any(test, feature = "testing"))]
Self::PostgresMock(api) => api.get_allowed_ips_and_secret(ctx, user_info).await,
@@ -80,7 +75,6 @@ impl ControlPlaneApi for ControlPlaneClient {
endpoint: EndpointId,
) -> Result<Vec<AuthRule>, errors::GetEndpointJwksError> {
match self {
Self::ProxyV1(api) => api.get_endpoint_jwks(ctx, endpoint).await,
Self::Neon(api) => api.get_endpoint_jwks(ctx, endpoint).await,
#[cfg(any(test, feature = "testing"))]
Self::PostgresMock(api) => api.get_endpoint_jwks(ctx, endpoint).await,
@@ -95,7 +89,6 @@ impl ControlPlaneApi for ControlPlaneClient {
user_info: &ComputeUserInfo,
) -> Result<CachedNodeInfo, errors::WakeComputeError> {
match self {
Self::ProxyV1(api) => api.wake_compute(ctx, user_info).await,
Self::Neon(api) => api.wake_compute(ctx, user_info).await,
#[cfg(any(test, feature = "testing"))]
Self::PostgresMock(api) => api.wake_compute(ctx, user_info).await,

View File

@@ -1,4 +1,4 @@
//! Stale console backend, remove after migrating to Proxy V1 API (#15245).
//! Production console backend.
use std::sync::Arc;
use std::time::Duration;

View File

@@ -230,16 +230,6 @@ pub(crate) struct GetRoleSecret {
pub(crate) project_id: Option<ProjectIdInt>,
}
/// Response which holds client's auth secret, e.g. [`crate::scram::ServerSecret`].
/// Returned by the `/get_endpoint_access_control` API method.
#[derive(Deserialize)]
pub(crate) struct GetEndpointAccessControl {
pub(crate) role_secret: Box<str>,
pub(crate) allowed_ips: Option<Vec<IpPattern>>,
pub(crate) project_id: Option<ProjectIdInt>,
pub(crate) allowed_vpc_endpoint_ids: Option<Vec<EndpointIdInt>>,
}
// Manually implement debug to omit sensitive info.
impl fmt::Debug for GetRoleSecret {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {

View File

@@ -272,36 +272,32 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
let do_handshake = handshake(ctx, stream, mode.handshake_tls(tls), record_handshake_error);
let (mut stream, params) = match tokio::time::timeout(config.handshake_timeout, do_handshake)
.await??
{
HandshakeData::Startup(stream, params) => (stream, params),
HandshakeData::Cancel(cancel_key_data) => {
// spawn a task to cancel the session, but don't wait for it
cancellations.spawn({
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
let session_id = ctx.session_id();
let peer_ip = ctx.peer_addr();
let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?session_id);
cancel_span.follows_from(tracing::Span::current());
async move {
drop(
cancellation_handler_clone
.cancel_session(
cancel_key_data,
session_id,
peer_ip,
config.authentication_config.ip_allowlist_check_enabled,
)
.instrument(cancel_span)
.await,
);
}
});
let (mut stream, params) =
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
HandshakeData::Startup(stream, params) => (stream, params),
HandshakeData::Cancel(cancel_key_data) => {
// spawn a task to cancel the session, but don't wait for it
cancellations.spawn({
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
let session_id = ctx.session_id();
let peer_ip = ctx.peer_addr();
async move {
drop(
cancellation_handler_clone
.cancel_session(
cancel_key_data,
session_id,
peer_ip,
config.authentication_config.ip_allowlist_check_enabled,
)
.await,
);
}
});
return Ok(None);
}
};
return Ok(None);
}
};
drop(pause);
ctx.set_db_options(params.clone());

View File

@@ -13,7 +13,6 @@ use crate::cache::project_info::ProjectInfoCache;
use crate::cancellation::{CancelMap, CancellationHandler};
use crate::intern::{ProjectIdInt, RoleNameInt};
use crate::metrics::{Metrics, RedisErrors, RedisEventsCount};
use tracing::Instrument;
const CPLANE_CHANNEL_NAME: &str = "neondb-proxy-ws-updates";
pub(crate) const PROXY_CHANNEL_NAME: &str = "neondb-proxy-to-proxy-updates";
@@ -144,8 +143,6 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
let peer_addr = cancel_session
.peer_addr
.unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED));
let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?cancel_session.session_id);
cancel_span.follows_from(tracing::Span::current());
// This instance of cancellation_handler doesn't have a RedisPublisherClient so it can't publish the message.
match self
.cancellation_handler
@@ -155,7 +152,6 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
peer_addr,
cancel_session.peer_addr.is_some(),
)
.instrument(cancel_span)
.await
{
Ok(()) => {}

View File

@@ -83,20 +83,14 @@ impl Env {
node_id: NodeId,
ttid: TenantTimelineId,
) -> anyhow::Result<Arc<Timeline>> {
let conf = Arc::new(self.make_conf(node_id));
let conf = self.make_conf(node_id);
let timeline_dir = get_timeline_dir(&conf, &ttid);
let remote_path = remote_timeline_path(&ttid)?;
let safekeeper = self.make_safekeeper(node_id, ttid).await?;
let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
let timeline = Timeline::new(
ttid,
&timeline_dir,
&remote_path,
shared_state,
conf.clone(),
);
let timeline = Timeline::new(ttid, &timeline_dir, &remote_path, shared_state);
timeline.bootstrap(
&mut timeline.write_shared_state().await,
&conf,

View File

@@ -338,7 +338,7 @@ async fn main() -> anyhow::Result<()> {
}
};
let conf = Arc::new(SafeKeeperConf {
let conf = SafeKeeperConf {
workdir,
my_id: id,
listen_pg_addr: args.listen_pg,
@@ -368,7 +368,7 @@ async fn main() -> anyhow::Result<()> {
control_file_save_interval: args.control_file_save_interval,
partial_backup_concurrency: args.partial_backup_concurrency,
eviction_min_resident: args.eviction_min_resident,
});
};
// initialize sentry if SENTRY_DSN is provided
let _sentry_guard = init_sentry(
@@ -382,7 +382,7 @@ async fn main() -> anyhow::Result<()> {
/// complete, e.g. panicked, inner is error produced by task itself.
type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
// fsync the datadir to make sure we have a consistent state on disk.
if !conf.no_sync {
let dfd = File::open(&conf.workdir).context("open datadir for syncfs")?;
@@ -428,11 +428,9 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
e
})?;
let global_timelines = Arc::new(GlobalTimelines::new(conf.clone()));
// Register metrics collector for active timelines. It's important to do this
// after daemonizing, otherwise process collector will be upset.
let timeline_collector = safekeeper::metrics::TimelineCollector::new(global_timelines.clone());
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
metrics::register_internal(Box::new(timeline_collector))?;
wal_backup::init_remote_storage(&conf).await;
@@ -449,8 +447,9 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
.then(|| Handle::try_current().expect("no runtime in main"));
// Load all timelines from disk to memory.
global_timelines.init().await?;
GlobalTimelines::init(conf.clone()).await?;
let conf_ = conf.clone();
// Run everything in current thread rt, if asked.
if conf.current_thread_runtime {
info!("running in current thread runtime");
@@ -460,16 +459,14 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
.as_ref()
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
.spawn(wal_service::task_main(
conf.clone(),
conf_,
pg_listener,
Scope::SafekeeperData,
global_timelines.clone(),
))
// wrap with task name for error reporting
.map(|res| ("WAL service main".to_owned(), res));
tasks_handles.push(Box::pin(wal_service_handle));
let global_timelines_ = global_timelines.clone();
let timeline_housekeeping_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
@@ -477,45 +474,40 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
const TOMBSTONE_TTL: Duration = Duration::from_secs(3600 * 24);
loop {
tokio::time::sleep(TOMBSTONE_TTL).await;
global_timelines_.housekeeping(&TOMBSTONE_TTL);
GlobalTimelines::housekeeping(&TOMBSTONE_TTL);
}
})
.map(|res| ("Timeline map housekeeping".to_owned(), res));
tasks_handles.push(Box::pin(timeline_housekeeping_handle));
if let Some(pg_listener_tenant_only) = pg_listener_tenant_only {
let conf_ = conf.clone();
let wal_service_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
.spawn(wal_service::task_main(
conf.clone(),
conf_,
pg_listener_tenant_only,
Scope::Tenant,
global_timelines.clone(),
))
// wrap with task name for error reporting
.map(|res| ("WAL service tenant only main".to_owned(), res));
tasks_handles.push(Box::pin(wal_service_handle));
}
let conf_ = conf.clone();
let http_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| HTTP_RUNTIME.handle())
.spawn(http::task_main(
conf.clone(),
http_listener,
global_timelines.clone(),
))
.spawn(http::task_main(conf_, http_listener))
.map(|res| ("HTTP service main".to_owned(), res));
tasks_handles.push(Box::pin(http_handle));
let conf_ = conf.clone();
let broker_task_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| BROKER_RUNTIME.handle())
.spawn(
broker::task_main(conf.clone(), global_timelines.clone())
.instrument(info_span!("broker")),
)
.spawn(broker::task_main(conf_).instrument(info_span!("broker")))
.map(|res| ("broker main".to_owned(), res));
tasks_handles.push(Box::pin(broker_task_handle));

View File

@@ -39,17 +39,14 @@ const RETRY_INTERVAL_MSEC: u64 = 1000;
const PUSH_INTERVAL_MSEC: u64 = 1000;
/// Push once in a while data about all active timelines to the broker.
async fn push_loop(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
if conf.disable_periodic_broker_push {
info!("broker push_loop is disabled, doing nothing...");
futures::future::pending::<()>().await; // sleep forever
return Ok(());
}
let active_timelines_set = global_timelines.get_global_broker_active_set();
let active_timelines_set = GlobalTimelines::get_global_broker_active_set();
let mut client =
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
@@ -90,13 +87,8 @@ async fn push_loop(
/// Subscribe and fetch all the interesting data from the broker.
#[instrument(name = "broker_pull", skip_all)]
async fn pull_loop(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
stats: Arc<BrokerStats>,
) -> Result<()> {
let mut client =
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;
// TODO: subscribe only to local timelines instead of all
let request = SubscribeSafekeeperInfoRequest {
@@ -121,7 +113,7 @@ async fn pull_loop(
.as_ref()
.ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
let ttid = parse_proto_ttid(proto_ttid)?;
if let Ok(tli) = global_timelines.get(ttid) {
if let Ok(tli) = GlobalTimelines::get(ttid) {
// Note that we also receive *our own* info. That's
// important, as it is used as an indication of live
// connection to the broker.
@@ -143,11 +135,7 @@ async fn pull_loop(
/// Process incoming discover requests. This is done in a separate task to avoid
/// interfering with the normal pull/push loops.
async fn discover_loop(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
stats: Arc<BrokerStats>,
) -> Result<()> {
async fn discover_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
let mut client =
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
@@ -183,7 +171,7 @@ async fn discover_loop(
.as_ref()
.ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
let ttid = parse_proto_ttid(proto_ttid)?;
if let Ok(tli) = global_timelines.get(ttid) {
if let Ok(tli) = GlobalTimelines::get(ttid) {
// we received a discovery request for a timeline we know about
discover_counter.inc();
@@ -222,10 +210,7 @@ async fn discover_loop(
bail!("end of stream");
}
pub async fn task_main(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
info!("started, broker endpoint {:?}", conf.broker_endpoint);
let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
@@ -276,13 +261,13 @@ pub async fn task_main(
},
_ = ticker.tick() => {
if push_handle.is_none() {
push_handle = Some(tokio::spawn(push_loop(conf.clone(), global_timelines.clone())));
push_handle = Some(tokio::spawn(push_loop(conf.clone())));
}
if pull_handle.is_none() {
pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), global_timelines.clone(), stats.clone())));
pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), stats.clone())));
}
if discover_handle.is_none() {
discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), global_timelines.clone(), stats.clone())));
discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), stats.clone())));
}
},
_ = &mut stats_task => {}

View File

@@ -1,7 +1,9 @@
use std::sync::Arc;
use anyhow::{bail, Result};
use camino::Utf8PathBuf;
use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
use std::sync::Arc;
use tokio::{
fs::OpenOptions,
io::{AsyncSeekExt, AsyncWriteExt},
@@ -12,7 +14,7 @@ use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::{
control_file::FileStorage,
state::TimelinePersistentState,
timeline::{TimelineError, WalResidentTimeline},
timeline::{Timeline, TimelineError, WalResidentTimeline},
timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},
wal_backup::copy_s3_segments,
wal_storage::{wal_file_paths, WalReader},
@@ -23,19 +25,16 @@ use crate::{
const MAX_BACKUP_LAG: u64 = 10 * WAL_SEGMENT_SIZE as u64;
pub struct Request {
pub source_ttid: TenantTimelineId,
pub source: Arc<Timeline>,
pub until_lsn: Lsn,
pub destination_ttid: TenantTimelineId,
}
pub async fn handle_request(
request: Request,
global_timelines: Arc<GlobalTimelines>,
) -> Result<()> {
pub async fn handle_request(request: Request) -> Result<()> {
// TODO: request.until_lsn MUST be a valid LSN, and we cannot check it :(
// if LSN will point to the middle of a WAL record, timeline will be in "broken" state
match global_timelines.get(request.destination_ttid) {
match GlobalTimelines::get(request.destination_ttid) {
// timeline already exists. would be good to check that this timeline is the copy
// of the source timeline, but it isn't obvious how to do that
Ok(_) => return Ok(()),
@@ -47,10 +46,9 @@ pub async fn handle_request(
}
}
let source = global_timelines.get(request.source_ttid)?;
let source_tli = source.wal_residence_guard().await?;
let source_tli = request.source.wal_residence_guard().await?;
let conf = &global_timelines.get_global_config();
let conf = &GlobalTimelines::get_global_config();
let ttid = request.destination_ttid;
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
@@ -129,7 +127,7 @@ pub async fn handle_request(
copy_s3_segments(
wal_seg_size,
&request.source_ttid,
&request.source.ttid,
&request.destination_ttid,
first_segment,
first_ondisk_segment,
@@ -160,9 +158,7 @@ pub async fn handle_request(
// now we have a ready timeline in a temp directory
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
global_timelines
.load_temp_timeline(request.destination_ttid, &tli_dir_path, true)
.await?;
GlobalTimelines::load_temp_timeline(request.destination_ttid, &tli_dir_path, true).await?;
Ok(())
}

View File

@@ -207,23 +207,23 @@ pub struct FileInfo {
}
/// Build debug dump response, using the provided [`Args`] filters.
pub async fn build(args: Args, global_timelines: Arc<GlobalTimelines>) -> Result<Response> {
pub async fn build(args: Args) -> Result<Response> {
let start_time = Utc::now();
let timelines_count = global_timelines.timelines_count();
let config = global_timelines.get_global_config();
let timelines_count = GlobalTimelines::timelines_count();
let config = GlobalTimelines::get_global_config();
let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() {
// If both tenant_id and timeline_id are specified, we can just get the
// timeline directly, without taking a snapshot of the whole list.
let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap());
if let Ok(tli) = global_timelines.get(ttid) {
if let Ok(tli) = GlobalTimelines::get(ttid) {
vec![tli]
} else {
vec![]
}
} else {
// Otherwise, take a snapshot of the whole list.
global_timelines.get_all()
GlobalTimelines::get_all()
};
let mut timelines = Vec::new();
@@ -344,12 +344,12 @@ fn get_wal_last_modified(path: &Utf8Path) -> Result<Option<DateTime<Utc>>> {
/// Converts SafeKeeperConf to Config, filtering out the fields that are not
/// supposed to be exposed.
fn build_config(config: Arc<SafeKeeperConf>) -> Config {
fn build_config(config: SafeKeeperConf) -> Config {
Config {
id: config.my_id,
workdir: config.workdir.clone().into(),
listen_pg_addr: config.listen_pg_addr.clone(),
listen_http_addr: config.listen_http_addr.clone(),
workdir: config.workdir.into(),
listen_pg_addr: config.listen_pg_addr,
listen_http_addr: config.listen_http_addr,
no_sync: config.no_sync,
max_offloader_lag_bytes: config.max_offloader_lag_bytes,
wal_backup_enabled: config.wal_backup_enabled,

View File

@@ -33,7 +33,7 @@ use utils::{
/// Safekeeper handler of postgres commands
pub struct SafekeeperPostgresHandler {
pub conf: Arc<SafeKeeperConf>,
pub conf: SafeKeeperConf,
/// assigned application name
pub appname: Option<String>,
pub tenant_id: Option<TenantId>,
@@ -43,7 +43,6 @@ pub struct SafekeeperPostgresHandler {
pub protocol: Option<PostgresClientProtocol>,
/// Unique connection id is logged in spans for observability.
pub conn_id: ConnectionId,
pub global_timelines: Arc<GlobalTimelines>,
/// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured.
auth: Option<(Scope, Arc<JwtAuth>)>,
claims: Option<Claims>,
@@ -315,11 +314,10 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
impl SafekeeperPostgresHandler {
pub fn new(
conf: Arc<SafeKeeperConf>,
conf: SafeKeeperConf,
conn_id: u32,
io_metrics: Option<TrafficMetrics>,
auth: Option<(Scope, Arc<JwtAuth>)>,
global_timelines: Arc<GlobalTimelines>,
) -> Self {
SafekeeperPostgresHandler {
conf,
@@ -333,7 +331,6 @@ impl SafekeeperPostgresHandler {
claims: None,
auth,
io_metrics,
global_timelines,
}
}
@@ -363,7 +360,7 @@ impl SafekeeperPostgresHandler {
pgb: &mut PostgresBackend<IO>,
) -> Result<(), QueryError> {
// Get timeline, handling "not found" error
let tli = match self.global_timelines.get(self.ttid) {
let tli = match GlobalTimelines::get(self.ttid) {
Ok(tli) => Ok(Some(tli)),
Err(TimelineError::NotFound(_)) => Ok(None),
Err(e) => Err(QueryError::Other(e.into())),
@@ -397,10 +394,7 @@ impl SafekeeperPostgresHandler {
&mut self,
pgb: &mut PostgresBackend<IO>,
) -> Result<(), QueryError> {
let tli = self
.global_timelines
.get(self.ttid)
.map_err(|e| QueryError::Other(e.into()))?;
let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?;
let lsn = if self.is_walproposer_recovery() {
// walproposer should get all local WAL until flush_lsn

View File

@@ -3,16 +3,14 @@ pub mod routes;
pub use routes::make_router;
pub use safekeeper_api::models;
use std::sync::Arc;
use crate::{GlobalTimelines, SafeKeeperConf};
use crate::SafeKeeperConf;
pub async fn task_main(
conf: Arc<SafeKeeperConf>,
conf: SafeKeeperConf,
http_listener: std::net::TcpListener,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
let router = make_router(conf, global_timelines)
let router = make_router(conf)
.build()
.map_err(|err| anyhow::anyhow!(err))?;
let service = utils::http::RouterService::new(router).unwrap();

View File

@@ -66,13 +66,6 @@ fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
.as_ref()
}
fn get_global_timelines(request: &Request<Body>) -> Arc<GlobalTimelines> {
request
.data::<Arc<GlobalTimelines>>()
.expect("unknown state type")
.clone()
}
/// Same as TermLsn, but serializes LSN using display serializer
/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
@@ -130,11 +123,9 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
check_permission(&request, Some(tenant_id))?;
ensure_no_body(&mut request).await?;
let global_timelines = get_global_timelines(&request);
// FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
// Using an `InternalServerError` should be fixed when the types support it
let delete_info = global_timelines
.delete_force_all_for_tenant(&tenant_id, only_local)
let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id, only_local)
.await
.map_err(ApiError::InternalServerError)?;
json_response(
@@ -165,9 +156,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
.commit_lsn
.segment_lsn(server_info.wal_seg_size as usize)
});
let global_timelines = get_global_timelines(&request);
global_timelines
.create(ttid, server_info, request_data.commit_lsn, local_start_lsn)
GlobalTimelines::create(ttid, server_info, request_data.commit_lsn, local_start_lsn)
.await
.map_err(ApiError::InternalServerError)?;
@@ -178,9 +167,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
/// Note: it is possible to do the same with debug_dump.
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let global_timelines = get_global_timelines(&request);
let res: Vec<TenantTimelineId> = global_timelines
.get_all()
let res: Vec<TenantTimelineId> = GlobalTimelines::get_all()
.iter()
.map(|tli| tli.ttid)
.collect();
@@ -195,8 +182,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
);
check_permission(&request, Some(ttid.tenant_id))?;
let global_timelines = get_global_timelines(&request);
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let (inmem, state) = tli.get_state().await;
let flush_lsn = tli.get_flush_lsn().await;
@@ -247,11 +233,9 @@ async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<
let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
check_permission(&request, Some(ttid.tenant_id))?;
ensure_no_body(&mut request).await?;
let global_timelines = get_global_timelines(&request);
// FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
// error handling here when we're able to.
let resp = global_timelines
.delete(&ttid, only_local)
let resp = GlobalTimelines::delete(&ttid, only_local)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, resp)
@@ -263,9 +247,8 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
let data: pull_timeline::Request = json_request(&mut request).await?;
let conf = get_conf(&request);
let global_timelines = get_global_timelines(&request);
let resp = pull_timeline::handle_request(data, conf.sk_auth_token.clone(), global_timelines)
let resp = pull_timeline::handle_request(data, conf.sk_auth_token.clone())
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, resp)
@@ -280,8 +263,7 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
);
check_permission(&request, Some(ttid.tenant_id))?;
let global_timelines = get_global_timelines(&request);
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
// To stream the body use wrap_stream which wants Stream of Result<Bytes>,
// so create the chan and write to it in another task.
@@ -311,19 +293,19 @@ async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Bo
check_permission(&request, None)?;
let request_data: TimelineCopyRequest = json_request(&mut request).await?;
let source_ttid = TenantTimelineId::new(
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "source_timeline_id")?,
);
let global_timelines = get_global_timelines(&request);
let source = GlobalTimelines::get(ttid)?;
copy_timeline::handle_request(copy_timeline::Request{
source_ttid,
source,
until_lsn: request_data.until_lsn,
destination_ttid: TenantTimelineId::new(source_ttid.tenant_id, request_data.target_timeline_id),
}, global_timelines)
.instrument(info_span!("copy_timeline", from=%source_ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn))
destination_ttid: TenantTimelineId::new(ttid.tenant_id, request_data.target_timeline_id),
})
.instrument(info_span!("copy_timeline", from=%ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn))
.await
.map_err(ApiError::InternalServerError)?;
@@ -340,8 +322,7 @@ async fn patch_control_file_handler(
parse_request_param(&request, "timeline_id")?,
);
let global_timelines = get_global_timelines(&request);
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let patch_request: patch_control_file::Request = json_request(&mut request).await?;
let response = patch_control_file::handle_request(tli, patch_request)
@@ -360,8 +341,7 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
parse_request_param(&request, "timeline_id")?,
);
let global_timelines = get_global_timelines(&request);
let tli = global_timelines.get(ttid)?;
let tli = GlobalTimelines::get(ttid)?;
tli.write_shared_state()
.await
.sk
@@ -379,7 +359,6 @@ async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body
);
check_permission(&request, Some(ttid.tenant_id))?;
let global_timelines = get_global_timelines(&request);
let from_lsn: Option<Lsn> = parse_query_param(&request, "from_lsn")?;
let until_lsn: Option<Lsn> = parse_query_param(&request, "until_lsn")?;
@@ -392,7 +371,7 @@ async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body
)))?,
};
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let tli = tli
.wal_residence_guard()
.await
@@ -414,8 +393,7 @@ async fn timeline_backup_partial_reset(request: Request<Body>) -> Result<Respons
);
check_permission(&request, Some(ttid.tenant_id))?;
let global_timelines = get_global_timelines(&request);
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let response = tli
.backup_partial_reset()
@@ -437,8 +415,7 @@ async fn timeline_term_bump_handler(
let request_data: TimelineTermBumpRequest = json_request(&mut request).await?;
let global_timelines = get_global_timelines(&request);
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let response = tli
.term_bump(request_data.term)
.await
@@ -475,8 +452,7 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
standby_horizon: sk_info.standby_horizon.0,
};
let global_timelines = get_global_timelines(&request);
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
tli.record_safekeeper_info(proto_sk_info)
.await
.map_err(ApiError::InternalServerError)?;
@@ -530,8 +506,6 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
let dump_term_history = dump_term_history.unwrap_or(true);
let dump_wal_last_modified = dump_wal_last_modified.unwrap_or(dump_all);
let global_timelines = get_global_timelines(&request);
let args = debug_dump::Args {
dump_all,
dump_control_file,
@@ -543,7 +517,7 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
timeline_id,
};
let resp = debug_dump::build(args, global_timelines)
let resp = debug_dump::build(args)
.await
.map_err(ApiError::InternalServerError)?;
@@ -596,10 +570,7 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
}
/// Safekeeper http router.
pub fn make_router(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
) -> RouterBuilder<hyper::Body, ApiError> {
pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
let mut router = endpoint::make_router();
if conf.http_auth.is_some() {
router = router.middleware(auth_middleware(|request| {
@@ -621,8 +592,7 @@ pub fn make_router(
// located nearby (/safekeeper/src/http/openapi_spec.yaml).
let auth = conf.http_auth.clone();
router
.data(conf)
.data(global_timelines)
.data(Arc::new(conf))
.data(auth)
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))

View File

@@ -11,6 +11,7 @@ use postgres_backend::QueryError;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::*;
use utils::id::TenantTimelineId;
use crate::handler::SafekeeperPostgresHandler;
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
@@ -20,6 +21,7 @@ use crate::safekeeper::{
use crate::safekeeper::{Term, TermHistory, TermLsn};
use crate::state::TimelinePersistentState;
use crate::timeline::WalResidentTimeline;
use crate::GlobalTimelines;
use postgres_backend::PostgresBackend;
use postgres_ffi::encode_logical_message;
use postgres_ffi::WAL_SEGMENT_SIZE;
@@ -68,7 +70,7 @@ pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
info!("JSON_CTRL request: {append_request:?}");
// need to init safekeeper state before AppendRequest
let tli = prepare_safekeeper(spg, append_request.pg_version).await?;
let tli = prepare_safekeeper(spg.ttid, append_request.pg_version).await?;
// if send_proposer_elected is true, we need to update local history
if append_request.send_proposer_elected {
@@ -97,22 +99,20 @@ pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
/// Prepare safekeeper to process append requests without crashes,
/// by sending ProposerGreeting with default server.wal_seg_size.
async fn prepare_safekeeper(
spg: &SafekeeperPostgresHandler,
ttid: TenantTimelineId,
pg_version: u32,
) -> anyhow::Result<WalResidentTimeline> {
let tli = spg
.global_timelines
.create(
spg.ttid,
ServerInfo {
pg_version,
wal_seg_size: WAL_SEGMENT_SIZE as u32,
system_id: 0,
},
Lsn::INVALID,
Lsn::INVALID,
)
.await?;
let tli = GlobalTimelines::create(
ttid,
ServerInfo {
pg_version,
wal_seg_size: WAL_SEGMENT_SIZE as u32,
system_id: 0,
},
Lsn::INVALID,
Lsn::INVALID,
)
.await?;
tli.wal_residence_guard().await
}

View File

@@ -455,7 +455,6 @@ pub struct FullTimelineInfo {
/// Collects metrics for all active timelines.
pub struct TimelineCollector {
global_timelines: Arc<GlobalTimelines>,
descs: Vec<Desc>,
commit_lsn: GenericGaugeVec<AtomicU64>,
backup_lsn: GenericGaugeVec<AtomicU64>,
@@ -479,8 +478,14 @@ pub struct TimelineCollector {
active_timelines_count: IntGauge,
}
impl Default for TimelineCollector {
fn default() -> Self {
Self::new()
}
}
impl TimelineCollector {
pub fn new(global_timelines: Arc<GlobalTimelines>) -> TimelineCollector {
pub fn new() -> TimelineCollector {
let mut descs = Vec::new();
let commit_lsn = GenericGaugeVec::new(
@@ -671,7 +676,6 @@ impl TimelineCollector {
descs.extend(active_timelines_count.desc().into_iter().cloned());
TimelineCollector {
global_timelines,
descs,
commit_lsn,
backup_lsn,
@@ -724,18 +728,17 @@ impl Collector for TimelineCollector {
self.written_wal_seconds.reset();
self.flushed_wal_seconds.reset();
let timelines_count = self.global_timelines.get_all().len();
let timelines_count = GlobalTimelines::get_all().len();
let mut active_timelines_count = 0;
// Prometheus Collector is sync, and data is stored under async lock. To
// bridge the gap with a crutch, collect data in spawned thread with
// local tokio runtime.
let global_timelines = self.global_timelines.clone();
let infos = std::thread::spawn(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.expect("failed to create rt");
rt.block_on(collect_timeline_metrics(global_timelines))
rt.block_on(collect_timeline_metrics())
})
.join()
.expect("collect_timeline_metrics thread panicked");
@@ -854,9 +857,9 @@ impl Collector for TimelineCollector {
}
}
async fn collect_timeline_metrics(global_timelines: Arc<GlobalTimelines>) -> Vec<FullTimelineInfo> {
async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
let mut res = vec![];
let active_timelines = global_timelines.get_global_broker_active_set().get_all();
let active_timelines = GlobalTimelines::get_global_broker_active_set().get_all();
for tli in active_timelines {
if let Some(info) = tli.info_for_metrics().await {

View File

@@ -409,9 +409,8 @@ pub struct DebugDumpResponse {
pub async fn handle_request(
request: Request,
sk_auth_token: Option<SecretString>,
global_timelines: Arc<GlobalTimelines>,
) -> Result<Response> {
let existing_tli = global_timelines.get(TenantTimelineId::new(
let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
request.tenant_id,
request.timeline_id,
));
@@ -454,14 +453,13 @@ pub async fn handle_request(
assert!(status.tenant_id == request.tenant_id);
assert!(status.timeline_id == request.timeline_id);
pull_timeline(status, safekeeper_host, sk_auth_token, global_timelines).await
pull_timeline(status, safekeeper_host, sk_auth_token).await
}
async fn pull_timeline(
status: TimelineStatus,
host: String,
sk_auth_token: Option<SecretString>,
global_timelines: Arc<GlobalTimelines>,
) -> Result<Response> {
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
info!(
@@ -474,7 +472,7 @@ async fn pull_timeline(
status.acceptor_state.epoch
);
let conf = &global_timelines.get_global_config();
let conf = &GlobalTimelines::get_global_config();
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
@@ -533,9 +531,7 @@ async fn pull_timeline(
assert!(status.commit_lsn <= status.flush_lsn);
// Finally, load the timeline.
let _tli = global_timelines
.load_temp_timeline(ttid, &tli_dir_path, false)
.await?;
let _tli = GlobalTimelines::load_temp_timeline(ttid, &tli_dir_path, false).await?;
Ok(Response {
safekeeper_host: host,

View File

@@ -267,7 +267,6 @@ impl SafekeeperPostgresHandler {
pgb_reader: &mut pgb_reader,
peer_addr,
acceptor_handle: &mut acceptor_handle,
global_timelines: self.global_timelines.clone(),
};
// Read first message and create timeline if needed.
@@ -332,7 +331,6 @@ struct NetworkReader<'a, IO> {
// WalAcceptor is spawned when we learn server info from walproposer and
// create timeline; handle is put here.
acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
global_timelines: Arc<GlobalTimelines>,
}
impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
@@ -352,11 +350,10 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
system_id: greeting.system_id,
wal_seg_size: greeting.wal_seg_size,
};
let tli = self
.global_timelines
.create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID)
.await
.context("create timeline")?;
let tli =
GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID)
.await
.context("create timeline")?;
tli.wal_residence_guard().await?
}
_ => {

View File

@@ -10,6 +10,7 @@ use crate::timeline::WalResidentTimeline;
use crate::wal_reader_stream::WalReaderStreamBuilder;
use crate::wal_service::ConnectionId;
use crate::wal_storage::WalReader;
use crate::GlobalTimelines;
use anyhow::{bail, Context as AnyhowContext};
use bytes::Bytes;
use futures::future::Either;
@@ -399,10 +400,7 @@ impl SafekeeperPostgresHandler {
start_pos: Lsn,
term: Option<Term>,
) -> Result<(), QueryError> {
let tli = self
.global_timelines
.get(self.ttid)
.map_err(|e| QueryError::Other(e.into()))?;
let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?;
let residence_guard = tli.wal_residence_guard().await?;
if let Err(end) = self

View File

@@ -44,8 +44,8 @@ use crate::wal_backup_partial::PartialRemoteSegment;
use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
use crate::SafeKeeperConf;
use crate::{debug_dump, timeline_manager, wal_storage};
use crate::{GlobalTimelines, SafeKeeperConf};
/// Things safekeeper should know about timeline state on peers.
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -467,7 +467,6 @@ pub struct Timeline {
walreceivers: Arc<WalReceivers>,
timeline_dir: Utf8PathBuf,
manager_ctl: ManagerCtl,
conf: Arc<SafeKeeperConf>,
/// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding
/// this gate, you must respect [`Timeline::cancel`]
@@ -490,7 +489,6 @@ impl Timeline {
timeline_dir: &Utf8Path,
remote_path: &RemotePath,
shared_state: SharedState,
conf: Arc<SafeKeeperConf>,
) -> Arc<Self> {
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
watch::channel(shared_state.sk.state().commit_lsn);
@@ -518,7 +516,6 @@ impl Timeline {
gate: Default::default(),
cancel: CancellationToken::default(),
manager_ctl: ManagerCtl::new(),
conf,
broker_active: AtomicBool::new(false),
wal_backup_active: AtomicBool::new(false),
last_removed_segno: AtomicU64::new(0),
@@ -527,14 +524,11 @@ impl Timeline {
}
/// Load existing timeline from disk.
pub fn load_timeline(
conf: Arc<SafeKeeperConf>,
ttid: TenantTimelineId,
) -> Result<Arc<Timeline>> {
pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
let shared_state = SharedState::restore(conf.as_ref(), &ttid)?;
let timeline_dir = get_timeline_dir(conf.as_ref(), &ttid);
let shared_state = SharedState::restore(conf, &ttid)?;
let timeline_dir = get_timeline_dir(conf, &ttid);
let remote_path = remote_timeline_path(&ttid)?;
Ok(Timeline::new(
@@ -542,7 +536,6 @@ impl Timeline {
&timeline_dir,
&remote_path,
shared_state,
conf,
))
}
@@ -611,7 +604,8 @@ impl Timeline {
// it is cancelled, so WAL storage won't be opened again.
shared_state.sk.close_wal_store();
if !only_local && self.conf.is_wal_backup_enabled() {
let conf = GlobalTimelines::get_global_config();
if !only_local && conf.is_wal_backup_enabled() {
// Note: we concurrently delete remote storage data from multiple
// safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
// do some retries anyway.
@@ -957,7 +951,7 @@ impl WalResidentTimeline {
pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> {
let (_, persisted_state) = self.get_state().await;
let enable_remote_read = self.conf.is_wal_backup_enabled();
let enable_remote_read = GlobalTimelines::get_global_config().is_wal_backup_enabled();
WalReader::new(
&self.ttid,
@@ -1067,6 +1061,7 @@ impl ManagerTimeline {
/// Try to switch state Offloaded->Present.
pub(crate) async fn switch_to_present(&self) -> anyhow::Result<()> {
let conf = GlobalTimelines::get_global_config();
let mut shared = self.write_shared_state().await;
// trying to restore WAL storage
@@ -1074,7 +1069,7 @@ impl ManagerTimeline {
&self.ttid,
&self.timeline_dir,
shared.sk.state(),
self.conf.no_sync,
conf.no_sync,
)?;
// updating control file
@@ -1101,7 +1096,7 @@ impl ManagerTimeline {
// now we can switch shared.sk to Present, shouldn't fail
let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
let cfile_state = prev_sk.take_state();
shared.sk = StateSK::Loaded(SafeKeeper::new(cfile_state, wal_store, self.conf.my_id)?);
shared.sk = StateSK::Loaded(SafeKeeper::new(cfile_state, wal_store, conf.my_id)?);
Ok(())
}

View File

@@ -13,6 +13,7 @@ use crate::{control_file, wal_storage, SafeKeeperConf};
use anyhow::{bail, Context, Result};
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use once_cell::sync::Lazy;
use serde::Serialize;
use std::collections::HashMap;
use std::str::FromStr;
@@ -41,16 +42,23 @@ struct GlobalTimelinesState {
// this map is dropped on restart.
tombstones: HashMap<TenantTimelineId, Instant>,
conf: Arc<SafeKeeperConf>,
conf: Option<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
global_rate_limiter: RateLimiter,
}
impl GlobalTimelinesState {
/// Get configuration, which must be set once during init.
fn get_conf(&self) -> &SafeKeeperConf {
self.conf
.as_ref()
.expect("GlobalTimelinesState conf is not initialized")
}
/// Get dependencies for a timeline constructor.
fn get_dependencies(&self) -> (Arc<SafeKeeperConf>, Arc<TimelinesSet>, RateLimiter) {
fn get_dependencies(&self) -> (SafeKeeperConf, Arc<TimelinesSet>, RateLimiter) {
(
self.conf.clone(),
self.get_conf().clone(),
self.broker_active_set.clone(),
self.global_rate_limiter.clone(),
)
@@ -74,39 +82,35 @@ impl GlobalTimelinesState {
}
}
/// A struct used to manage access to the global timelines map.
pub struct GlobalTimelines {
state: Mutex<GlobalTimelinesState>,
}
static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
tombstones: HashMap::new(),
conf: None,
broker_active_set: Arc::new(TimelinesSet::default()),
global_rate_limiter: RateLimiter::new(1, 1),
})
});
/// A zero-sized struct used to manage access to the global timelines map.
pub struct GlobalTimelines;
impl GlobalTimelines {
/// Create a new instance of the global timelines map.
pub fn new(conf: Arc<SafeKeeperConf>) -> Self {
Self {
state: Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
tombstones: HashMap::new(),
conf,
broker_active_set: Arc::new(TimelinesSet::default()),
global_rate_limiter: RateLimiter::new(1, 1),
}),
}
}
/// Inject dependencies needed for the timeline constructors and load all timelines to memory.
pub async fn init(&self) -> Result<()> {
pub async fn init(conf: SafeKeeperConf) -> Result<()> {
// clippy isn't smart enough to understand that drop(state) releases the
// lock, so use explicit block
let tenants_dir = {
let mut state = self.state.lock().unwrap();
let mut state = TIMELINES_STATE.lock().unwrap();
state.global_rate_limiter = RateLimiter::new(
state.conf.partial_backup_concurrency,
conf.partial_backup_concurrency,
DEFAULT_EVICTION_CONCURRENCY,
);
state.conf = Some(conf);
// Iterate through all directories and load tenants for all directories
// named as a valid tenant_id.
state.conf.workdir.clone()
state.get_conf().workdir.clone()
};
let mut tenant_count = 0;
for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
@@ -118,7 +122,7 @@ impl GlobalTimelines {
TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
{
tenant_count += 1;
self.load_tenant_timelines(tenant_id).await?;
GlobalTimelines::load_tenant_timelines(tenant_id).await?;
}
}
Err(e) => error!(
@@ -131,7 +135,7 @@ impl GlobalTimelines {
info!(
"found {} tenants directories, successfully loaded {} timelines",
tenant_count,
self.state.lock().unwrap().timelines.len()
TIMELINES_STATE.lock().unwrap().timelines.len()
);
Ok(())
}
@@ -139,13 +143,13 @@ impl GlobalTimelines {
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir
/// errors if any.
///
/// It is async, but self.state lock is sync and there is no important
/// It is async, but TIMELINES_STATE lock is sync and there is no important
/// reason to make it async (it is always held for a short while), so we
/// just lock and unlock it for each timeline -- this function is called
/// during init when nothing else is running, so this is fine.
async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> {
async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let state = self.state.lock().unwrap();
let state = TIMELINES_STATE.lock().unwrap();
state.get_dependencies()
};
@@ -159,10 +163,10 @@ impl GlobalTimelines {
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
{
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
match Timeline::load_timeline(conf.clone(), ttid) {
match Timeline::load_timeline(&conf, ttid) {
Ok(tli) => {
let mut shared_state = tli.write_shared_state().await;
self.state
TIMELINES_STATE
.lock()
.unwrap()
.timelines
@@ -196,30 +200,29 @@ impl GlobalTimelines {
}
/// Get the number of timelines in the map.
pub fn timelines_count(&self) -> usize {
self.state.lock().unwrap().timelines.len()
pub fn timelines_count() -> usize {
TIMELINES_STATE.lock().unwrap().timelines.len()
}
/// Get the global safekeeper config.
pub fn get_global_config(&self) -> Arc<SafeKeeperConf> {
self.state.lock().unwrap().conf.clone()
pub fn get_global_config() -> SafeKeeperConf {
TIMELINES_STATE.lock().unwrap().get_conf().clone()
}
pub fn get_global_broker_active_set(&self) -> Arc<TimelinesSet> {
self.state.lock().unwrap().broker_active_set.clone()
pub fn get_global_broker_active_set() -> Arc<TimelinesSet> {
TIMELINES_STATE.lock().unwrap().broker_active_set.clone()
}
/// Create a new timeline with the given id. If the timeline already exists, returns
/// an existing timeline.
pub(crate) async fn create(
&self,
ttid: TenantTimelineId,
server_info: ServerInfo,
commit_lsn: Lsn,
local_start_lsn: Lsn,
) -> Result<Arc<Timeline>> {
let (conf, _, _) = {
let state = self.state.lock().unwrap();
let state = TIMELINES_STATE.lock().unwrap();
if let Ok(timeline) = state.get(&ttid) {
// Timeline already exists, return it.
return Ok(timeline);
@@ -242,7 +245,7 @@ impl GlobalTimelines {
let state =
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?;
control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
let timeline = self.load_temp_timeline(ttid, &tmp_dir_path, true).await?;
let timeline = GlobalTimelines::load_temp_timeline(ttid, &tmp_dir_path, true).await?;
Ok(timeline)
}
@@ -258,14 +261,13 @@ impl GlobalTimelines {
/// 2) move the directory and load the timeline
/// 3) take lock again and insert the timeline into the global map.
pub async fn load_temp_timeline(
&self,
ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf,
check_tombstone: bool,
) -> Result<Arc<Timeline>> {
// Check for existence and mark that we're creating it.
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let mut state = self.state.lock().unwrap();
let mut state = TIMELINES_STATE.lock().unwrap();
match state.timelines.get(&ttid) {
Some(GlobalMapTimeline::CreationInProgress) => {
bail!(TimelineError::CreationInProgress(ttid));
@@ -293,10 +295,10 @@ impl GlobalTimelines {
};
// Do the actual move and reflect the result in the map.
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, conf.clone()).await {
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, &conf).await {
Ok(timeline) => {
let mut timeline_shared_state = timeline.write_shared_state().await;
let mut state = self.state.lock().unwrap();
let mut state = TIMELINES_STATE.lock().unwrap();
assert!(matches!(
state.timelines.get(&ttid),
Some(GlobalMapTimeline::CreationInProgress)
@@ -317,7 +319,7 @@ impl GlobalTimelines {
}
Err(e) => {
// Init failed, remove the marker from the map
let mut state = self.state.lock().unwrap();
let mut state = TIMELINES_STATE.lock().unwrap();
assert!(matches!(
state.timelines.get(&ttid),
Some(GlobalMapTimeline::CreationInProgress)
@@ -332,10 +334,10 @@ impl GlobalTimelines {
async fn install_temp_timeline(
ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf,
conf: Arc<SafeKeeperConf>,
conf: &SafeKeeperConf,
) -> Result<Arc<Timeline>> {
let tenant_path = get_tenant_dir(conf.as_ref(), &ttid.tenant_id);
let timeline_path = get_timeline_dir(conf.as_ref(), &ttid);
let tenant_path = get_tenant_dir(conf, &ttid.tenant_id);
let timeline_path = get_timeline_dir(conf, &ttid);
// We must have already checked that timeline doesn't exist in the map,
// but there might be existing datadir: if timeline is corrupted it is
@@ -380,9 +382,9 @@ impl GlobalTimelines {
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
/// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
/// i.e. loaded in memory and not cancelled.
pub(crate) fn get(&self, ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
pub(crate) fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
let tli_res = {
let state = self.state.lock().unwrap();
let state = TIMELINES_STATE.lock().unwrap();
state.get(&ttid)
};
match tli_res {
@@ -397,8 +399,8 @@ impl GlobalTimelines {
}
/// Returns all timelines. This is used for background timeline processes.
pub fn get_all(&self) -> Vec<Arc<Timeline>> {
let global_lock = self.state.lock().unwrap();
pub fn get_all() -> Vec<Arc<Timeline>> {
let global_lock = TIMELINES_STATE.lock().unwrap();
global_lock
.timelines
.values()
@@ -417,8 +419,8 @@ impl GlobalTimelines {
/// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
/// and that's why it can return cancelled timelines, to retry deleting them.
fn get_all_for_tenant(&self, tenant_id: TenantId) -> Vec<Arc<Timeline>> {
let global_lock = self.state.lock().unwrap();
fn get_all_for_tenant(tenant_id: TenantId) -> Vec<Arc<Timeline>> {
let global_lock = TIMELINES_STATE.lock().unwrap();
global_lock
.timelines
.values()
@@ -433,12 +435,11 @@ impl GlobalTimelines {
/// Cancels timeline, then deletes the corresponding data directory.
/// If only_local, doesn't remove WAL segments in remote storage.
pub(crate) async fn delete(
&self,
ttid: &TenantTimelineId,
only_local: bool,
) -> Result<TimelineDeleteForceResult> {
let tli_res = {
let state = self.state.lock().unwrap();
let state = TIMELINES_STATE.lock().unwrap();
if state.tombstones.contains_key(ttid) {
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
@@ -471,7 +472,7 @@ impl GlobalTimelines {
}
Err(_) => {
// Timeline is not memory, but it may still exist on disk in broken state.
let dir_path = get_timeline_dir(self.state.lock().unwrap().conf.as_ref(), ttid);
let dir_path = get_timeline_dir(TIMELINES_STATE.lock().unwrap().get_conf(), ttid);
let dir_existed = delete_dir(dir_path)?;
Ok(TimelineDeleteForceResult {
@@ -484,7 +485,7 @@ impl GlobalTimelines {
// Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones
// are used to prevent still-running computes from re-creating the same timeline when they send data,
// and to speed up repeated deletion calls by avoiding re-listing objects.
self.state.lock().unwrap().delete(*ttid);
TIMELINES_STATE.lock().unwrap().delete(*ttid);
result
}
@@ -496,18 +497,17 @@ impl GlobalTimelines {
///
/// If only_local, doesn't remove WAL segments in remote storage.
pub async fn delete_force_all_for_tenant(
&self,
tenant_id: &TenantId,
only_local: bool,
) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
info!("deleting all timelines for tenant {}", tenant_id);
let to_delete = self.get_all_for_tenant(*tenant_id);
let to_delete = Self::get_all_for_tenant(*tenant_id);
let mut err = None;
let mut deleted = HashMap::new();
for tli in &to_delete {
match self.delete(&tli.ttid, only_local).await {
match Self::delete(&tli.ttid, only_local).await {
Ok(result) => {
deleted.insert(tli.ttid, result);
}
@@ -529,15 +529,15 @@ impl GlobalTimelines {
// so the directory may be not empty. In this case timelines will have bad state
// and timeline background jobs can panic.
delete_dir(get_tenant_dir(
self.state.lock().unwrap().conf.as_ref(),
TIMELINES_STATE.lock().unwrap().get_conf(),
tenant_id,
))?;
Ok(deleted)
}
pub fn housekeeping(&self, tombstone_ttl: &Duration) {
let mut state = self.state.lock().unwrap();
pub fn housekeeping(tombstone_ttl: &Duration) {
let mut state = TIMELINES_STATE.lock().unwrap();
// We keep tombstones long enough to have a good chance of preventing rogue computes from re-creating deleted
// timelines. If a compute kept running for longer than this TTL (or across a safekeeper restart) then they

View File

@@ -4,7 +4,6 @@
//!
use anyhow::{Context, Result};
use postgres_backend::QueryError;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio_io_timeout::TimeoutReader;
@@ -12,9 +11,9 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{auth::Scope, measured_stream::MeasuredStream};
use crate::handler::SafekeeperPostgresHandler;
use crate::metrics::TrafficMetrics;
use crate::SafeKeeperConf;
use crate::{handler::SafekeeperPostgresHandler, GlobalTimelines};
use postgres_backend::{AuthType, PostgresBackend};
/// Accept incoming TCP connections and spawn them into a background thread.
@@ -23,10 +22,9 @@ use postgres_backend::{AuthType, PostgresBackend};
/// to any tenant are allowed) or Tenant (only tokens giving access to specific
/// tenant are allowed). Doesn't matter if auth is disabled in conf.
pub async fn task_main(
conf: Arc<SafeKeeperConf>,
conf: SafeKeeperConf,
pg_listener: std::net::TcpListener,
allowed_auth_scope: Scope,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
// Tokio's from_std won't do this for us, per its comment.
pg_listener.set_nonblocking(true)?;
@@ -39,10 +37,10 @@ pub async fn task_main(
debug!("accepted connection from {}", peer_addr);
let conf = conf.clone();
let conn_id = issue_connection_id(&mut connection_count);
let global_timelines = global_timelines.clone();
tokio::spawn(
async move {
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope, global_timelines).await {
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope).await {
error!("connection handler exited: {}", err);
}
}
@@ -55,10 +53,9 @@ pub async fn task_main(
///
async fn handle_socket(
socket: TcpStream,
conf: Arc<SafeKeeperConf>,
conf: SafeKeeperConf,
conn_id: ConnectionId,
allowed_auth_scope: Scope,
global_timelines: Arc<GlobalTimelines>,
) -> Result<(), QueryError> {
socket.set_nodelay(true)?;
let peer_addr = socket.peer_addr()?;
@@ -99,13 +96,8 @@ async fn handle_socket(
Some(_) => AuthType::NeonJWT,
};
let auth_pair = auth_key.map(|key| (allowed_auth_scope, key));
let mut conn_handler = SafekeeperPostgresHandler::new(
conf,
conn_id,
Some(traffic_metrics.clone()),
auth_pair,
global_timelines,
);
let mut conn_handler =
SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()), auth_pair);
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
// libpq protocol between safekeeper and walproposer / pageserver
// We don't use shutdown.

View File

@@ -56,3 +56,6 @@ utils = { path = "../libs/utils/" }
metrics = { path = "../libs/metrics/" }
control_plane = { path = "../control_plane" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
[dev-dependencies]
test-log = {version="0.2.16"}

View File

@@ -636,13 +636,6 @@ impl Persistence {
.into_boxed(),
};
// Clear generation_pageserver if we are moving into a state where we won't have
// any attached pageservers.
let input_generation_pageserver = match input_placement_policy {
None | Some(PlacementPolicy::Attached(_)) => None,
Some(PlacementPolicy::Detached | PlacementPolicy::Secondary) => Some(None),
};
#[derive(AsChangeset)]
#[diesel(table_name = crate::schema::tenant_shards)]
struct ShardUpdate {
@@ -650,7 +643,6 @@ impl Persistence {
placement_policy: Option<String>,
config: Option<String>,
scheduling_policy: Option<String>,
generation_pageserver: Option<Option<i64>>,
}
let update = ShardUpdate {
@@ -663,7 +655,6 @@ impl Persistence {
.map(|c| serde_json::to_string(&c).unwrap()),
scheduling_policy: input_scheduling_policy
.map(|p| serde_json::to_string(&p).unwrap()),
generation_pageserver: input_generation_pageserver,
};
query.set(update).execute(conn)?;

View File

@@ -809,7 +809,21 @@ impl Reconciler {
if self.cancel.is_cancelled() {
return Err(ReconcileError::Cancel);
}
self.location_config(&node, conf, None, false).await?;
// We only try to configure secondary locations if the node is available. This does
// not stop us succeeding with the reconcile, because our core goal is to make the
// shard _available_ (the attached location), and configuring secondary locations
// can be done lazily when the node becomes available (via background reconciliation).
if node.is_available() {
self.location_config(&node, conf, None, false).await?;
} else {
// If the node is unavailable, we skip and consider the reconciliation successful: this
// is a common case where a pageserver is marked unavailable: we demote a location on
// that unavailable pageserver to secondary.
tracing::info!("Skipping configuring secondary location {node}, it is unavailable");
self.observed
.locations
.insert(node.get_id(), ObservedStateLocation { conf: None });
}
}
// The condition below identifies a detach. We must have no attached intent and

View File

@@ -47,6 +47,12 @@ pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized {
preferred_az: &Option<AvailabilityZone>,
context: &ScheduleContext,
) -> Option<Self>;
/// Return a score that drops any components based on node utilization: this is useful
/// for finding scores for scheduling optimisation, when we want to avoid rescheduling
/// shards due to e.g. disk usage, to avoid flapping.
fn for_optimization(&self) -> Self;
fn is_overloaded(&self) -> bool;
fn node_id(&self) -> NodeId;
}
@@ -136,17 +142,13 @@ impl PartialOrd for SecondaryAzMatch {
/// Ordering is given by member declaration order (top to bottom).
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub(crate) struct NodeAttachmentSchedulingScore {
/// The number of shards belonging to the tenant currently being
/// scheduled that are attached to this node.
affinity_score: AffinityScore,
/// Flag indicating whether this node matches the preferred AZ
/// of the shard. For equal affinity scores, nodes in the matching AZ
/// are considered first.
az_match: AttachmentAzMatch,
/// Size of [`ScheduleContext::attached_nodes`] for the current node.
/// This normally tracks the number of attached shards belonging to the
/// tenant being scheduled that are already on this node.
attached_shards_in_context: usize,
/// The number of shards belonging to the tenant currently being
/// scheduled that are attached to this node.
affinity_score: AffinityScore,
/// Utilisation score that combines shard count and disk utilisation
utilization_score: u64,
/// Total number of shards attached to this node. When nodes have identical utilisation, this
@@ -177,13 +179,25 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
.copied()
.unwrap_or(AffinityScore::FREE),
az_match: AttachmentAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
attached_shards_in_context: context.attached_nodes.get(node_id).copied().unwrap_or(0),
utilization_score: utilization.cached_score(),
total_attached_shard_count: node.attached_shard_count,
node_id: *node_id,
})
}
/// For use in scheduling optimisation, where we only want to consider the aspects
/// of the score that can only be resolved by moving things (such as inter-shard affinity
/// and AZ affinity), and ignore aspects that reflect the total utilization of a node (which
/// can fluctuate for other reasons)
fn for_optimization(&self) -> Self {
Self {
utilization_score: 0,
total_attached_shard_count: 0,
node_id: NodeId(0),
..*self
}
}
fn is_overloaded(&self) -> bool {
PageserverUtilization::is_overloaded(self.utilization_score)
}
@@ -242,6 +256,15 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore {
})
}
fn for_optimization(&self) -> Self {
Self {
utilization_score: 0,
total_attached_shard_count: 0,
node_id: NodeId(0),
..*self
}
}
fn is_overloaded(&self) -> bool {
PageserverUtilization::is_overloaded(self.utilization_score)
}
@@ -293,6 +316,10 @@ impl AffinityScore {
pub(crate) fn inc(&mut self) {
self.0 += 1;
}
pub(crate) fn dec(&mut self) {
self.0 -= 1;
}
}
impl std::ops::Add for AffinityScore {
@@ -353,15 +380,44 @@ impl ScheduleContext {
*entry += 1;
}
pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
self.nodes
.get(&node_id)
.copied()
.unwrap_or(AffinityScore::FREE)
/// Imagine we migrated our attached location to the given node. Return a new context that
/// reflects this.
pub(crate) fn project_detach(&self, shard: &TenantShard) -> Self {
let mut new_context = self.clone();
if let Some(attached) = shard.intent.get_attached() {
if let Some(count) = new_context.attached_nodes.get_mut(attached) {
// It's unexpected that we get called in a context where the source of
// the migration is not already in the context.
debug_assert!(*count > 0);
if *count > 0 {
*count -= 1;
}
}
if let Some(score) = new_context.nodes.get_mut(attached) {
score.dec();
}
}
for secondary in shard.intent.get_secondary() {
if let Some(score) = new_context.nodes.get_mut(secondary) {
score.dec();
}
}
new_context
}
pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
self.attached_nodes.get(&node_id).copied().unwrap_or(0)
pub(crate) fn project_secondary_detach(&self, source: NodeId) -> Self {
let mut new_context = self.clone();
if let Some(score) = new_context.nodes.get_mut(&source) {
score.dec();
}
new_context
}
#[cfg(test)]
@@ -636,6 +692,22 @@ impl Scheduler {
node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
}
/// Calculate a single node's score, used in optimizer logic to compare specific
/// nodes' scores.
pub(crate) fn compute_node_score<Score>(
&mut self,
node_id: NodeId,
preferred_az: &Option<AvailabilityZone>,
context: &ScheduleContext,
) -> Option<Score>
where
Score: NodeSchedulingScore,
{
self.nodes
.get_mut(&node_id)
.and_then(|node| Score::generate(&node_id, node, preferred_az, context))
}
/// Compute a schedulling score for each node that the scheduler knows of
/// minus a set of hard excluded nodes.
fn compute_node_scores<Score>(
@@ -727,7 +799,7 @@ impl Scheduler {
tracing::info!(
"scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
scores.iter().map(|i| i.node_id().0).collect::<Vec<_>>()
);
);
}
// Note that we do not update shard count here to reflect the scheduling: that
@@ -742,6 +814,10 @@ impl Scheduler {
self.schedule_shard::<AttachedShardTag>(&[], &None, &ScheduleContext::default())
}
pub(crate) fn get_node_az(&self, node_id: &NodeId) -> Option<AvailabilityZone> {
self.nodes.get(node_id).map(|n| n.az.clone())
}
/// Unit test access to internal state
#[cfg(test)]
pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
@@ -799,7 +875,14 @@ pub(crate) mod test_utils {
#[cfg(test)]
mod tests {
use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
use pageserver_api::{
controller_api::NodeAvailability, models::utilization::test_utilization,
shard::ShardIdentity,
};
use utils::{
id::TenantId,
shard::{ShardCount, ShardNumber, TenantShardId},
};
use super::*;
@@ -1045,9 +1128,9 @@ mod tests {
&mut context,
);
// Node 2 is not in "az-a", but it has the lowest affinity so we prefer that.
// Node 1 and 3 (az-a) have same affinity score, so prefer the lowest node id.
assert_scheduler_chooses::<AttachedShardTag>(
NodeId(2),
NodeId(1),
Some(az_a_tag.clone()),
&mut scheduled_intents,
&mut scheduler,
@@ -1063,28 +1146,196 @@ mod tests {
&mut context,
);
// Avoid nodes in "az-b" for the secondary location.
// Nodes 1 and 3 are identically loaded, so prefer the lowest node id.
assert_scheduler_chooses::<SecondaryShardTag>(
NodeId(1),
Some(az_b_tag.clone()),
&mut scheduled_intents,
&mut scheduler,
&mut context,
);
// Avoid nodes in "az-b" for the secondary location.
// Node 3 has lower affinity score than 1, so prefer that.
assert_scheduler_chooses::<SecondaryShardTag>(
NodeId(3),
Some(az_b_tag.clone()),
&mut scheduled_intents,
&mut scheduler,
&mut context,
);
for mut intent in scheduled_intents {
intent.clear(&mut scheduler);
}
}
use test_log::test;
#[test]
/// Make sure that when we have an odd number of nodes and an even number of shards, we still
/// get scheduling stability.
fn odd_nodes_stability() {
let az_a = AvailabilityZone("az-a".to_string());
let az_b = AvailabilityZone("az-b".to_string());
let nodes = test_utils::make_test_nodes(
10,
&[
az_a.clone(),
az_a.clone(),
az_a.clone(),
az_a.clone(),
az_a.clone(),
az_b.clone(),
az_b.clone(),
az_b.clone(),
az_b.clone(),
az_b.clone(),
],
);
let mut scheduler = Scheduler::new(nodes.values());
// Need to keep these alive because they contribute to shard counts via RAII
let mut scheduled_shards = Vec::new();
let mut context = ScheduleContext::default();
fn schedule_shard(
tenant_shard_id: TenantShardId,
expect_attached: NodeId,
expect_secondary: NodeId,
scheduled_shards: &mut Vec<TenantShard>,
scheduler: &mut Scheduler,
preferred_az: Option<AvailabilityZone>,
context: &mut ScheduleContext,
) {
let shard_identity = ShardIdentity::new(
tenant_shard_id.shard_number,
tenant_shard_id.shard_count,
pageserver_api::shard::ShardStripeSize(1),
)
.unwrap();
let mut shard = TenantShard::new(
tenant_shard_id,
shard_identity,
pageserver_api::controller_api::PlacementPolicy::Attached(1),
preferred_az,
);
shard.schedule(scheduler, context).unwrap();
assert_eq!(shard.intent.get_attached().unwrap(), expect_attached);
assert_eq!(
shard.intent.get_secondary().first().unwrap(),
&expect_secondary
);
scheduled_shards.push(shard);
}
let tenant_id = TenantId::generate();
schedule_shard(
TenantShardId {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(8),
},
NodeId(1),
NodeId(6),
&mut scheduled_shards,
&mut scheduler,
Some(az_a.clone()),
&mut context,
);
schedule_shard(
TenantShardId {
tenant_id,
shard_number: ShardNumber(1),
shard_count: ShardCount(8),
},
NodeId(2),
NodeId(7),
&mut scheduled_shards,
&mut scheduler,
Some(az_a.clone()),
&mut context,
);
schedule_shard(
TenantShardId {
tenant_id,
shard_number: ShardNumber(2),
shard_count: ShardCount(8),
},
NodeId(3),
NodeId(8),
&mut scheduled_shards,
&mut scheduler,
Some(az_a.clone()),
&mut context,
);
schedule_shard(
TenantShardId {
tenant_id,
shard_number: ShardNumber(3),
shard_count: ShardCount(8),
},
NodeId(4),
NodeId(9),
&mut scheduled_shards,
&mut scheduler,
Some(az_a.clone()),
&mut context,
);
schedule_shard(
TenantShardId {
tenant_id,
shard_number: ShardNumber(4),
shard_count: ShardCount(8),
},
NodeId(5),
NodeId(10),
&mut scheduled_shards,
&mut scheduler,
Some(az_a.clone()),
&mut context,
);
schedule_shard(
TenantShardId {
tenant_id,
shard_number: ShardNumber(5),
shard_count: ShardCount(8),
},
NodeId(1),
NodeId(6),
&mut scheduled_shards,
&mut scheduler,
Some(az_a.clone()),
&mut context,
);
schedule_shard(
TenantShardId {
tenant_id,
shard_number: ShardNumber(6),
shard_count: ShardCount(8),
},
NodeId(2),
NodeId(7),
&mut scheduled_shards,
&mut scheduler,
Some(az_a.clone()),
&mut context,
);
schedule_shard(
TenantShardId {
tenant_id,
shard_number: ShardNumber(7),
shard_count: ShardCount(8),
},
NodeId(3),
NodeId(8),
&mut scheduled_shards,
&mut scheduler,
Some(az_a.clone()),
&mut context,
);
// Assert that the optimizer suggests nochanges, i.e. our initial scheduling was stable.
for shard in &scheduled_shards {
assert_eq!(shard.optimize_attachment(&mut scheduler, &context), None);
}
for mut shard in scheduled_shards {
shard.intent.clear(&mut scheduler);
}
}
}

View File

@@ -29,7 +29,7 @@ use crate::{
ShardGenerationState, TenantFilter,
},
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
scheduler::{AttachedShardTag, MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
tenant_shard::{
MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus,
ScheduleOptimization, ScheduleOptimizationAction,
@@ -513,9 +513,6 @@ struct ShardUpdate {
/// If this is None, generation is not updated.
generation: Option<Generation>,
/// If this is None, scheduling policy is not updated.
scheduling_policy: Option<ShardSchedulingPolicy>,
}
enum StopReconciliationsReason {
@@ -792,7 +789,7 @@ impl Service {
node_list_futs.push({
async move {
tracing::info!("Scanning shards on node {node}...");
let timeout = Duration::from_secs(5);
let timeout = Duration::from_secs(1);
let response = node
.with_client_retries(
|client| async move { client.list_location_config().await },
@@ -1579,6 +1576,7 @@ impl Service {
attach_req.tenant_shard_id,
ShardIdentity::unsharded(),
PlacementPolicy::Attached(0),
None,
),
);
tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id);
@@ -2106,6 +2104,21 @@ impl Service {
)
};
let preferred_az_id: Option<AvailabilityZone> = {
let mut locked = self.inner.write().unwrap();
// Idempotency: take the existing value if the tenant already exists
if let Some(shard) = locked.tenants.get(create_ids.first().unwrap()) {
shard.preferred_az().cloned()
} else {
locked
.scheduler
.schedule_shard::<AttachedShardTag>(&[], &None, &ScheduleContext::default())
.ok()
.and_then(|n_id| locked.scheduler.get_node_az(&n_id))
}
};
// Ordering: we persist tenant shards before creating them on the pageserver. This enables a caller
// to clean up after themselves by issuing a tenant deletion if something goes wrong and we restart
// during the creation, rather than risking leaving orphan objects in S3.
@@ -2125,7 +2138,7 @@ impl Service {
splitting: SplitState::default(),
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
.unwrap(),
preferred_az_id: None,
preferred_az_id: preferred_az_id.as_ref().map(|az| az.to_string()),
})
.collect();
@@ -2161,6 +2174,7 @@ impl Service {
&create_req.shard_parameters,
create_req.config.clone(),
placement_policy.clone(),
preferred_az_id.as_ref(),
&mut schedule_context,
)
.await;
@@ -2174,44 +2188,6 @@ impl Service {
}
}
let preferred_azs = {
let locked = self.inner.read().unwrap();
response_shards
.iter()
.filter_map(|resp| {
let az_id = locked
.nodes
.get(&resp.node_id)
.map(|n| n.get_availability_zone_id().clone())?;
Some((resp.shard_id, az_id))
})
.collect::<Vec<_>>()
};
// Note that we persist the preferred AZ for the new shards separately.
// In theory, we could "peek" the scheduler to determine where the shard will
// land, but the subsequent "real" call into the scheduler might select a different
// node. Hence, we do this awkward update to keep things consistent.
let updated = self
.persistence
.set_tenant_shard_preferred_azs(preferred_azs)
.await
.map_err(|err| {
ApiError::InternalServerError(anyhow::anyhow!(
"Failed to persist preferred az ids: {err}"
))
})?;
{
let mut locked = self.inner.write().unwrap();
for (tid, az_id) in updated {
if let Some(shard) = locked.tenants.get_mut(&tid) {
shard.set_preferred_az(az_id);
}
}
}
// If we failed to schedule shards, then they are still created in the controller,
// but we return an error to the requester to avoid a silent failure when someone
// tries to e.g. create a tenant whose placement policy requires more nodes than
@@ -2242,6 +2218,7 @@ impl Service {
/// Helper for tenant creation that does the scheduling for an individual shard. Covers both the
/// case of a new tenant and a pre-existing one.
#[allow(clippy::too_many_arguments)]
async fn do_initial_shard_scheduling(
&self,
tenant_shard_id: TenantShardId,
@@ -2249,6 +2226,7 @@ impl Service {
shard_params: &ShardParameters,
config: TenantConfig,
placement_policy: PlacementPolicy,
preferred_az_id: Option<&AvailabilityZone>,
schedule_context: &mut ScheduleContext,
) -> InitialShardScheduleOutcome {
let mut locked = self.inner.write().unwrap();
@@ -2286,6 +2264,7 @@ impl Service {
tenant_shard_id,
ShardIdentity::from_params(tenant_shard_id.shard_number, shard_params),
placement_policy,
preferred_az_id.cloned(),
));
state.generation = initial_generation;
@@ -2379,23 +2358,6 @@ impl Service {
}
};
// Ordinarily we do not update scheduling policy, but when making major changes
// like detaching or demoting to secondary-only, we need to force the scheduling
// mode to Active, or the caller's expected outcome (detach it) will not happen.
let scheduling_policy = match req.config.mode {
LocationConfigMode::Detached | LocationConfigMode::Secondary => {
// Special case: when making major changes like detaching or demoting to secondary-only,
// we need to force the scheduling mode to Active, or nothing will happen.
Some(ShardSchedulingPolicy::Active)
}
LocationConfigMode::AttachedMulti
| LocationConfigMode::AttachedSingle
| LocationConfigMode::AttachedStale => {
// While attached, continue to respect whatever the existing scheduling mode is.
None
}
};
let mut create = true;
for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
// Saw an existing shard: this is not a creation
@@ -2421,7 +2383,6 @@ impl Service {
placement_policy: placement_policy.clone(),
tenant_config: req.config.tenant_conf.clone(),
generation: set_generation,
scheduling_policy,
});
}
@@ -2518,7 +2479,6 @@ impl Service {
placement_policy,
tenant_config,
generation,
scheduling_policy,
} in &updates
{
self.persistence
@@ -2527,7 +2487,7 @@ impl Service {
Some(placement_policy.clone()),
Some(tenant_config.clone()),
*generation,
*scheduling_policy,
None,
)
.await?;
}
@@ -2543,7 +2503,6 @@ impl Service {
placement_policy,
tenant_config,
generation: update_generation,
scheduling_policy,
} in updates
{
let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
@@ -2562,10 +2521,6 @@ impl Service {
shard.generation = Some(generation);
}
if let Some(scheduling_policy) = scheduling_policy {
shard.set_scheduling_policy(scheduling_policy);
}
shard.schedule(scheduler, &mut schedule_context)?;
let maybe_waiter = self.maybe_reconcile_shard(shard, nodes);
@@ -3019,17 +2974,9 @@ impl Service {
let TenantPolicyRequest {
placement,
mut scheduling,
scheduling,
} = req;
if let Some(PlacementPolicy::Detached | PlacementPolicy::Secondary) = placement {
// When someone configures a tenant to detach, we force the scheduling policy to enable
// this to take effect.
if scheduling.is_none() {
scheduling = Some(ShardSchedulingPolicy::Active);
}
}
self.persistence
.update_tenant_shard(
TenantFilter::Tenant(tenant_id),
@@ -4184,16 +4131,14 @@ impl Service {
},
);
let mut child_state = TenantShard::new(child, child_shard, policy.clone());
let mut child_state =
TenantShard::new(child, child_shard, policy.clone(), preferred_az.clone());
child_state.intent = IntentState::single(scheduler, Some(pageserver));
child_state.observed = ObservedState {
locations: child_observed,
};
child_state.generation = Some(generation);
child_state.config = config.clone();
if let Some(preferred_az) = &preferred_az {
child_state.set_preferred_az(preferred_az.clone());
}
// The child's TenantShard::splitting is intentionally left at the default value of Idle,
// as at this point in the split process we have succeeded and this part is infallible:
@@ -5386,7 +5331,7 @@ impl Service {
register_req.listen_http_port,
register_req.listen_pg_addr,
register_req.listen_pg_port,
register_req.availability_zone_id,
register_req.availability_zone_id.clone(),
);
// TODO: idempotency if the node already exists in the database
@@ -5406,8 +5351,9 @@ impl Service {
.set(locked.nodes.len() as i64);
tracing::info!(
"Registered pageserver {}, now have {} pageservers",
"Registered pageserver {} ({}), now have {} pageservers",
register_req.node_id,
register_req.availability_zone_id,
locked.nodes.len()
);
Ok(())
@@ -6144,11 +6090,17 @@ impl Service {
// How many candidate optimizations we will generate, before evaluating them for readniess: setting
// this higher than the execution limit gives us a chance to execute some work even if the first
// few optimizations we find are not ready.
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 8;
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 2;
let mut work = Vec::new();
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
let (_nodes, tenants, scheduler) = locked.parts_mut();
// We are going to plan a bunch of optimisations before applying any of them, so the
// utilisation stats on nodes will be effectively stale for the >1st optimisation we
// generate. To avoid this causing unstable migrations/flapping, it's important that the
// code in TenantShard for finding optimisations uses [`NodeAttachmentSchedulingScore::disregard_utilization`]
// to ignore the utilisation component of the score.
for (_tenant_id, schedule_context, shards) in
TenantShardContextIterator::new(tenants, ScheduleMode::Speculative)
@@ -6184,7 +6136,7 @@ impl Service {
if let Some(optimization) =
// If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to
// its primary location based on soft constraints, cut it over.
shard.optimize_attachment(nodes, &schedule_context)
shard.optimize_attachment(scheduler, &schedule_context)
{
work.push((shard.tenant_shard_id, optimization));
break;
@@ -6243,8 +6195,10 @@ impl Service {
}
}
}
ScheduleOptimizationAction::ReplaceSecondary(_) => {
// No extra checks needed to replace a secondary: this does not interrupt client access
ScheduleOptimizationAction::ReplaceSecondary(_)
| ScheduleOptimizationAction::CreateSecondary(_)
| ScheduleOptimizationAction::RemoveSecondary(_) => {
// No extra checks needed to manage secondaries: this does not interrupt client access
validated_work.push((tenant_shard_id, optimization))
}
};
@@ -6316,26 +6270,35 @@ impl Service {
/// we have this helper to move things along faster.
#[cfg(feature = "testing")]
async fn kick_secondary_download(&self, tenant_shard_id: TenantShardId) {
let (attached_node, secondary_node) = {
let (attached_node, secondaries) = {
let locked = self.inner.read().unwrap();
let Some(shard) = locked.tenants.get(&tenant_shard_id) else {
tracing::warn!(
"Skipping kick of secondary download for {tenant_shard_id}: not found"
);
return;
};
let (Some(attached), Some(secondary)) = (
shard.intent.get_attached(),
shard.intent.get_secondary().first(),
) else {
let Some(attached) = shard.intent.get_attached() else {
tracing::warn!(
"Skipping kick of secondary download for {tenant_shard_id}: no attached"
);
return;
};
(
locked.nodes.get(attached).unwrap().clone(),
locked.nodes.get(secondary).unwrap().clone(),
)
let secondaries = shard
.intent
.get_secondary()
.iter()
.map(|n| locked.nodes.get(n).unwrap().clone())
.collect::<Vec<_>>();
(locked.nodes.get(attached).unwrap().clone(), secondaries)
};
// Make remote API calls to upload + download heatmaps: we ignore errors because this is just
// a 'kick' to let scheduling optimisation run more promptly.
attached_node
match attached_node
.with_client_retries(
|client| async move { client.tenant_heatmap_upload(tenant_shard_id).await },
&self.config.jwt_token,
@@ -6344,22 +6307,57 @@ impl Service {
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await;
.await
{
Some(Err(e)) => {
tracing::info!(
"Failed to upload heatmap from {attached_node} for {tenant_shard_id}: {e}"
);
}
None => {
tracing::info!(
"Cancelled while uploading heatmap from {attached_node} for {tenant_shard_id}"
);
}
Some(Ok(_)) => {
tracing::info!(
"Successfully uploaded heatmap from {attached_node} for {tenant_shard_id}"
);
}
}
secondary_node
.with_client_retries(
|client| async move {
client
.tenant_secondary_download(tenant_shard_id, Some(Duration::from_secs(1)))
.await
},
&self.config.jwt_token,
3,
10,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await;
for secondary_node in secondaries {
match secondary_node
.with_client_retries(
|client| async move {
client
.tenant_secondary_download(
tenant_shard_id,
Some(Duration::from_secs(1)),
)
.await
},
&self.config.jwt_token,
3,
10,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await
{
Some(Err(e)) => {
tracing::info!(
"Failed to download heatmap from {secondary_node} for {tenant_shard_id}: {e}"
);
}
None => {
tracing::info!("Cancelled while downloading heatmap from {secondary_node} for {tenant_shard_id}");
}
Some(Ok(progress)) => {
tracing::info!("Successfully downloaded heatmap from {secondary_node} for {tenant_shard_id}: {progress:?}");
}
}
}
}
/// Look for shards which are oversized and in need of splitting
@@ -6795,9 +6793,15 @@ impl Service {
fn fill_node_plan(&self, node_id: NodeId) -> Vec<TenantShardId> {
let mut locked = self.inner.write().unwrap();
let fill_requirement = locked.scheduler.compute_fill_requirement(node_id);
let (nodes, tenants, _scheduler) = locked.parts_mut();
let mut tids_by_node = locked
.tenants
let node_az = nodes
.get(&node_id)
.expect("Node must exist")
.get_availability_zone_id()
.clone();
let mut tids_by_node = tenants
.iter_mut()
.filter_map(|(tid, tenant_shard)| {
if !matches!(
@@ -6810,6 +6814,25 @@ impl Service {
return None;
}
// AZ check: when filling nodes after a restart, our intent is to move _back_ the
// shards which belong on this node, not to promote shards whose scheduling preference
// would be on their currently attached node. So will avoid promoting shards whose
// home AZ doesn't match the AZ of the node we're filling.
match tenant_shard.preferred_az() {
None => {
// Shard doesn't have an AZ preference: it is elegible to be moved.
}
Some(az) if az == &node_az => {
// This shard's home AZ is equal to the node we're filling: it is
// elegible to be moved: fall through;
}
Some(_) => {
// This shard's home AZ is somewhere other than the node we're filling:
// do not include it in the fill plan.
return None;
}
}
if tenant_shard.intent.get_secondary().contains(&node_id) {
if let Some(primary) = tenant_shard.intent.get_attached() {
return Some((*primary, *tid));

View File

@@ -11,16 +11,14 @@ use crate::{
persistence::TenantShardPersistence,
reconciler::{ReconcileUnits, ReconcilerConfig},
scheduler::{
AffinityScore, AttachedShardTag, MaySchedule, RefCountUpdate, ScheduleContext,
SecondaryShardTag,
AttachedShardTag, NodeAttachmentSchedulingScore, NodeSchedulingScore,
NodeSecondarySchedulingScore, RefCountUpdate, ScheduleContext, SecondaryShardTag,
},
service::ReconcileResultRequest,
};
use futures::future::{self, Either};
use itertools::Itertools;
use pageserver_api::controller_api::{
AvailabilityZone, NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
};
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy, ShardSchedulingPolicy};
use pageserver_api::{
models::{LocationConfig, LocationConfigMode, TenantConfig},
shard::{ShardIdentity, TenantShardId},
@@ -315,6 +313,7 @@ pub(crate) struct ObservedStateLocation {
/// we know that we might have some state on this node.
pub(crate) conf: Option<LocationConfig>,
}
pub(crate) struct ReconcilerWaiter {
// For observability purposes, remember the ID of the shard we're
// waiting for.
@@ -360,6 +359,10 @@ pub(crate) enum ScheduleOptimizationAction {
ReplaceSecondary(ReplaceSecondary),
// Migrate attachment to an existing secondary location
MigrateAttachment(MigrateAttachment),
// Create a secondary location, with the intent of later migrating to it
CreateSecondary(NodeId),
// Remove a secondary location that we previously created to facilitate a migration
RemoveSecondary(NodeId),
}
#[derive(Eq, PartialEq, Debug, Clone)]
@@ -472,6 +475,7 @@ impl TenantShard {
tenant_shard_id: TenantShardId,
shard: ShardIdentity,
policy: PlacementPolicy,
preferred_az_id: Option<AvailabilityZone>,
) -> Self {
metrics::METRICS_REGISTRY
.metrics_group
@@ -495,7 +499,7 @@ impl TenantShard {
last_error: Arc::default(),
pending_compute_notification: false,
scheduling_policy: ShardSchedulingPolicy::default(),
preferred_az_id: None,
preferred_az_id,
}
}
@@ -626,24 +630,7 @@ impl TenantShard {
use PlacementPolicy::*;
match self.policy {
Attached(secondary_count) => {
let retain_secondaries = if self.intent.attached.is_none()
&& scheduler.node_preferred(&self.intent.secondary).is_some()
{
// If we have no attached, and one of the secondaries is elegible to be promoted, retain
// one more secondary than we usually would, as one of them will become attached futher down this function.
secondary_count + 1
} else {
secondary_count
};
while self.intent.secondary.len() > retain_secondaries {
// We have no particular preference for one secondary location over another: just
// arbitrarily drop from the end
self.intent.pop_secondary(scheduler);
modified = true;
}
// Should have exactly one attached, and N secondaries
// Should have exactly one attached, and at least N secondaries
let (modified_attached, attached_node_id) =
self.schedule_attached(scheduler, context)?;
modified |= modified_attached;
@@ -740,90 +727,258 @@ impl TenantShard {
Ok(())
}
/// Returns None if the current location's score is unavailable, i.e. cannot draw a conclusion
fn is_better_location(
&self,
scheduler: &mut Scheduler,
schedule_context: &ScheduleContext,
current: NodeId,
candidate: NodeId,
) -> Option<bool> {
let Some(candidate_score) = scheduler.compute_node_score::<NodeAttachmentSchedulingScore>(
candidate,
&self.preferred_az_id,
schedule_context,
) else {
// The candidate node is unavailable for scheduling or otherwise couldn't get a score
return None;
};
match scheduler.compute_node_score::<NodeAttachmentSchedulingScore>(
current,
&self.preferred_az_id,
schedule_context,
) {
Some(current_score) => {
// Ignore utilization components when comparing scores: we don't want to migrate
// because of transient load variations, it risks making the system thrash, and
// migrating for utilization requires a separate high level view of the system to
// e.g. prioritize moving larger or smaller tenants, rather than arbitrarily
// moving things around in the order that we hit this function.
let candidate_score = candidate_score.for_optimization();
let current_score = current_score.for_optimization();
if candidate_score < current_score {
tracing::info!("Found a lower scoring location! {candidate} is better than {current} ({candidate_score:?} is better than {current_score:?})");
Some(true)
} else {
// The candidate node is no better than our current location, so don't migrate
tracing::debug!(
"Candidate node {candidate} is no better than our current location {current} (candidate {candidate_score:?} vs current {current_score:?})",
);
Some(false)
}
}
None => {
// The current node is unavailable for scheduling, so we can't make any sensible
// decisions about optimisation. This should be a transient state -- if the node
// is offline then it will get evacuated, if is blocked by a scheduling mode
// then we will respect that mode by doing nothing.
tracing::debug!("Current node {current} is unavailable for scheduling");
None
}
}
}
fn find_better_location(
&self,
scheduler: &mut Scheduler,
schedule_context: &ScheduleContext,
) -> Option<NodeId> {
// TODO: fast path: if the attached node is already in the preferred AZ, _and_ has no
// other shards from the same tenant on it, then skip doing any scheduling calculations.
let attached = (*self.intent.get_attached())?;
tracing::info!(
"Initially: attached {attached} ({:?} vs {:?}), in context {schedule_context:?}",
scheduler.get_node_az(&attached),
self.preferred_az_id.as_ref()
);
// Construct a schedule context that excludes locations belonging to
// this shard: this simulates removing and re-scheduling the shard
// let schedule_context = schedule_context.project_detach(self);
// Look for a lower-scoring location to attach to
let Ok(candidate_node) = scheduler.schedule_shard::<AttachedShardTag>(
&[], // Don't hard-exclude anything: we want to consider the possibility of migrating to somewhere we already have a secondary
&self.preferred_az_id,
schedule_context,
) else {
// A scheduling error means we have no possible candidate replacements
tracing::debug!("No candidate node found");
return None;
};
if candidate_node == attached {
// We're already at the best possible location, so don't migrate
tracing::debug!("Candidate node {candidate_node} is already attached");
return None;
}
// Check if our candidate node is in the preferred AZ: it might not be, if the scheduler
// is trying its best to handle an overloaded AZ.
if self.preferred_az_id.is_some()
&& scheduler.get_node_az(&candidate_node) != self.preferred_az_id
{
tracing::debug!(
"Candidate node {candidate_node} is not in preferred AZ {:?}",
self.preferred_az_id
);
return None;
}
self.is_better_location(scheduler, schedule_context, attached, candidate_node)
.and_then(|better| if better { Some(candidate_node) } else { None })
}
/// Optimize attachments: if a shard has a secondary location that is preferable to
/// its primary location based on soft constraints, switch that secondary location
/// to be attached.
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) fn optimize_attachment(
&self,
nodes: &HashMap<NodeId, Node>,
scheduler: &mut Scheduler,
schedule_context: &ScheduleContext,
) -> Option<ScheduleOptimization> {
let attached = (*self.intent.get_attached())?;
if self.intent.secondary.is_empty() {
// We can only do useful work if we have both attached and secondary locations: this
// function doesn't schedule new locations, only swaps between attached and secondaries.
return None;
}
let current_affinity_score = schedule_context.get_node_affinity(attached);
let current_attachment_count = schedule_context.get_node_attachments(attached);
let schedule_context = schedule_context.project_detach(self);
// Generate score for each node, dropping any un-schedulable nodes.
let all_pageservers = self.intent.all_pageservers();
let mut scores = all_pageservers
.iter()
.flat_map(|node_id| {
let node = nodes.get(node_id);
if node.is_none() {
None
} else if matches!(
node.unwrap().get_scheduling(),
NodeSchedulingPolicy::Filling
) {
// If the node is currently filling, don't count it as a candidate to avoid,
// racing with the background fill.
None
} else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
None
} else {
let affinity_score = schedule_context.get_node_affinity(*node_id);
let attachment_count = schedule_context.get_node_attachments(*node_id);
Some((*node_id, affinity_score, attachment_count))
}
})
.collect::<Vec<_>>();
// Sort precedence:
// 1st - prefer nodes with the lowest total affinity score
// 2nd - prefer nodes with the lowest number of attachments in this context
// 3rd - if all else is equal, sort by node ID for determinism in tests.
scores.sort_by_key(|i| (i.1, i.2, i.0));
if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
scores.first()
{
if attached != *preferred_node {
// The best alternative must be more than 1 better than us, otherwise we could end
// up flapping back next time we're called (e.g. there's no point migrating from
// a location with score 1 to a score zero, because on next location the situation
// would be the same, but in reverse).
if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
|| current_attachment_count > *preferred_attachment_count + 1
{
tracing::info!(
"Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
self.intent.get_secondary()
);
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: attached,
new_attached_node_id: *preferred_node,
}),
});
}
} else {
tracing::debug!(
"Node {} is already preferred (score {:?})",
preferred_node,
preferred_affinity_score
);
// If we already have a secondary that is higher-scoring than out current location,
// then simply migrate to it.
for secondary in self.intent.get_secondary() {
if let Some(true) =
self.is_better_location(scheduler, &schedule_context, attached, *secondary)
{
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: attached,
new_attached_node_id: *secondary,
}),
});
}
}
// Fall-through: we didn't find an optimization
None
// Given that none of our current secondaries is a better location than our current
// attached location (checked above), we may trim any secondaries that are not needed
// for the placement policy.
if self.intent.get_secondary().len() > self.policy.want_secondaries() {
// This code path cleans up extra secondaries after migrating, and/or
// trims extra secondaries after a PlacementPolicy::Attached(N) was
// modified to decrease N.
let mut secondary_scores = self
.intent
.get_secondary()
.iter()
.map(|node_id| {
(
*node_id,
scheduler.compute_node_score::<NodeSecondarySchedulingScore>(
*node_id,
&self.preferred_az_id,
&schedule_context,
),
)
})
.collect::<Vec<_>>();
if secondary_scores.iter().any(|score| score.1.is_none()) {
// Don't have full list of scores, so can't make a good decision about which to drop unless
// there is an obvious one in the wrong AZ
for secondary in self.intent.get_secondary() {
if scheduler.get_node_az(secondary) == self.preferred_az_id {
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(*secondary),
});
}
}
// Fall through: we didn't identify one to remove. This ought to be rare.
tracing::warn!("Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)",
self.intent.get_secondary()
);
} else {
secondary_scores.sort_by_key(|score| score.1.unwrap());
let victim = secondary_scores.last().unwrap().0;
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(victim),
});
}
}
let replacement = self.find_better_location(scheduler, &schedule_context);
// We have found a candidate and confirmed that its score is preferable
// to our current location. See if we have a secondary location in the preferred location already: if not,
// then create one.
if let Some(replacement) = replacement {
if !self.intent.get_secondary().contains(&replacement) {
tracing::info!(
"Identified optimization({}): create secondary {replacement}",
self.tenant_shard_id
);
Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::CreateSecondary(replacement),
})
} else {
// We already have a secondary in the preferred location, let's try migrating to it. Our caller
// will check the warmth of the destination before deciding whether to really execute this.
tracing::info!(
"Identified optimization({}): migrate attachment {attached}->{replacement}",
self.tenant_shard_id
);
Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: attached,
new_attached_node_id: replacement,
}),
})
}
// } else if self.intent.get_secondary().len() > self.policy.want_secondaries() {
// // We aren't in the process of migrating anywhere, and we're attached in our preferred AZ. If there are
// // any other secondary locations in our preferred AZ, we presume they were created to facilitate a migration
// // of the attached location, and remove them.
// for secondary in self.intent.get_secondary() {
// if scheduler.get_node_az(secondary) == self.preferred_az_id {
// tracing::info!(
// "Identified optimization({}): remove secondary {secondary}",
// self.tenant_shard_id
// );
// return Some(ScheduleOptimization {
// sequence: self.sequence,
// action: ScheduleOptimizationAction::RemoveSecondary(*secondary),
// });
// }
// }
// // Fall through: maybe we had excess secondaries in other AZs? Trim them in an arbitrary order
// // (lowest Node ID first).
// let mut secondary_node_ids = self.intent.get_secondary().clone();
// secondary_node_ids.sort();
// let victim = secondary_node_ids
// .first()
// .expect("Within block for > check on secondary count");
// Some(ScheduleOptimization {
// sequence: self.sequence,
// action: ScheduleOptimizationAction::RemoveSecondary(*victim),
// })
} else {
// We didn't find somewhere we'd rather be, and we don't have any excess secondaries
// to clean up: no action required.
None
}
// TODO: if we find that our current location is optimal, _and_ we also have a secondary
// in the preferred AZ, then clean up that secondary: it was only created to act as a
// migration destination.
// ...maybe do this in optimize_secondary()?
}
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
@@ -832,18 +987,18 @@ impl TenantShard {
scheduler: &mut Scheduler,
schedule_context: &ScheduleContext,
) -> Option<ScheduleOptimization> {
if self.intent.secondary.is_empty() {
// We can only do useful work if we have both attached and secondary locations: this
// function doesn't schedule new locations, only swaps between attached and secondaries.
if self.intent.get_secondary().len() > self.policy.want_secondaries() {
// We have extra secondaries, perhaps to facilitate a migration of the attached location:
// do nothing, it is up to [`Self::optimize_attachment`] to clean them up. When that's done,
// and we are called again, we will proceed.
tracing::debug!("Too many secondaries: skipping");
return None;
}
for secondary in self.intent.get_secondary() {
let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
// We're already on a node unaffected any affinity constraints,
// so we won't change it.
continue;
};
let schedule_context = schedule_context.project_secondary_detach(*secondary);
// TODO: fast path to avoid full scheduling calculation if we're in the right AZ and not
// sharing with any other shards in the same tenant
// Let the scheduler suggest a node, where it would put us if we were scheduling afresh
// This implicitly limits the choice to nodes that are available, and prefers nodes
@@ -851,33 +1006,63 @@ impl TenantShard {
let Ok(candidate_node) = scheduler.schedule_shard::<SecondaryShardTag>(
&self.intent.all_pageservers(),
&self.preferred_az_id,
schedule_context,
&schedule_context,
) else {
// A scheduling error means we have no possible candidate replacements
continue;
};
let candidate_affinity_score = schedule_context
.nodes
.get(&candidate_node)
.unwrap_or(&AffinityScore::FREE);
let Some(candidate_score) = scheduler
.compute_node_score::<NodeSecondarySchedulingScore>(
candidate_node,
&self.preferred_az_id,
&schedule_context,
)
else {
// The candidate node is unavailable for scheduling or otherwise couldn't get a score
// This is unexpected, because schedule() yielded this node
debug_assert!(false);
continue;
};
// The best alternative must be more than 1 better than us, otherwise we could end
// up flapping back next time we're called.
if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
// If some other node is available and has a lower score than this node, then
// that other node is a good place to migrate to.
tracing::info!(
"Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
self.intent.get_secondary()
);
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
old_node_id: *secondary,
new_node_id: candidate_node,
}),
});
match scheduler.compute_node_score::<NodeSecondarySchedulingScore>(
*secondary,
&self.preferred_az_id,
&schedule_context,
) {
Some(current_score) => {
// Disregard utilization: we don't want to thrash around based on disk utilization
let current_score = current_score.for_optimization();
let candidate_score = candidate_score.for_optimization();
if candidate_score < current_score {
tracing::info!(
"Identified optimization({}, home AZ {:?}): replace secondary {secondary}->{candidate_node} (current secondaries {:?}) Candidate {:?} < current {:?} ",
self.tenant_shard_id,
self.preferred_az_id,
self.intent.get_secondary(),
candidate_score,
current_score
);
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::ReplaceSecondary(
ReplaceSecondary {
old_node_id: *secondary,
new_node_id: candidate_node,
},
),
});
}
}
None => {
// The current node is unavailable for scheduling, so we can't make any sensible
// decisions about optimisation. This should be a transient state -- if the node
// is offline then it will get evacuated, if is blocked by a scheduling mode
// then we will respect that mode by doing nothing.
tracing::debug!("Current node {secondary} is unavailable for scheduling");
continue;
}
}
}
@@ -916,6 +1101,12 @@ impl TenantShard {
self.intent.remove_secondary(scheduler, old_node_id);
self.intent.push_secondary(scheduler, new_node_id);
}
ScheduleOptimizationAction::CreateSecondary(new_node_id) => {
self.intent.push_secondary(scheduler, new_node_id);
}
ScheduleOptimizationAction::RemoveSecondary(old_secondary) => {
self.intent.remove_secondary(scheduler, old_secondary);
}
}
true
@@ -1571,6 +1762,7 @@ pub(crate) mod tests {
)
.unwrap(),
policy,
None,
)
}
@@ -1606,6 +1798,7 @@ pub(crate) mod tests {
)
.unwrap(),
policy.clone(),
None,
);
if let Some(az) = &preferred_az {
@@ -1749,65 +1942,92 @@ pub(crate) mod tests {
}
#[test]
fn optimize_attachment() -> anyhow::Result<()> {
let nodes = make_test_nodes(3, &[]);
/// Simple case: moving attachment to somewhere better where we already have a secondary
fn optimize_attachment_simple() -> anyhow::Result<()> {
let nodes = make_test_nodes(
3,
&[
AvailabilityZone("az-a".to_string()),
AvailabilityZone("az-b".to_string()),
AvailabilityZone("az-c".to_string()),
],
);
let mut scheduler = Scheduler::new(nodes.values());
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
shard_a.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
shard_b.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
// Initially: both nodes attached on shard 1, and both have secondary locations
// on different nodes.
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
shard_a.intent.push_secondary(&mut scheduler, NodeId(1));
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
let mut schedule_context = ScheduleContext::default();
schedule_context.avoid(&shard_a.intent.all_pageservers());
schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
schedule_context.avoid(&shard_b.intent.all_pageservers());
schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
let mut schedule_context = ScheduleContext::default();
schedule_context.avoid(&shard_a.intent.all_pageservers());
schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
schedule_context.avoid(&shard_b.intent.all_pageservers());
schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
schedule_context
}
let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
// Either shard should recognize that it has the option to switch to a secondary location where there
// would be no other shards from the same tenant, and request to do so.
let schedule_context = make_schedule_context(&shard_a, &shard_b);
let optimization_a = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(1),
new_attached_node_id: NodeId(2)
old_attached_node_id: NodeId(2),
new_attached_node_id: NodeId(1)
})
})
);
// Note that these optimizing two shards in the same tenant with the same ScheduleContext is
// mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
// of [`Service::optimize_all`] to avoid trying
// to do optimizations for multiple shards in the same tenant at the same time. Generating
// both optimizations is just done for test purposes
let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
assert_eq!(
optimization_b,
Some(ScheduleOptimization {
sequence: shard_b.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(1),
new_attached_node_id: NodeId(3)
})
})
);
// Applying these optimizations should result in the end state proposed
shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
// // Either shard should recognize that it has the option to switch to a secondary location where there
// // would be no other shards from the same tenant, and request to do so.
// assert_eq!(
// optimization_a_prepare,
// Some(ScheduleOptimization {
// sequence: shard_a.sequence,
// action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
// })
// );
// shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
// let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
// assert_eq!(
// optimization_a_migrate,
// Some(ScheduleOptimization {
// sequence: shard_a.sequence,
// action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
// old_attached_node_id: NodeId(1),
// new_attached_node_id: NodeId(2)
// })
// })
// );
// shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
// let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
// assert_eq!(
// optimization_a_cleanup,
// Some(ScheduleOptimization {
// sequence: shard_a.sequence,
// action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
// })
// );
// shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
// // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
// assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
shard_a.intent.clear(&mut scheduler);
shard_b.intent.clear(&mut scheduler);
@@ -1815,6 +2035,168 @@ pub(crate) mod tests {
Ok(())
}
#[test]
/// Complicated case: moving attachment to somewhere better where we do not have a secondary
/// already, creating one as needed.
fn optimize_attachment_multistep() -> anyhow::Result<()> {
let nodes = make_test_nodes(
3,
&[
AvailabilityZone("az-a".to_string()),
AvailabilityZone("az-b".to_string()),
AvailabilityZone("az-c".to_string()),
],
);
let mut scheduler = Scheduler::new(nodes.values());
// Two shards of a tenant that wants to be in AZ A
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
shard_a.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
shard_b.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
// Both shards are initially attached in non-home AZ _and_ have secondaries in non-home AZs
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(3)));
shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
let mut schedule_context = ScheduleContext::default();
schedule_context.avoid(&shard_a.intent.all_pageservers());
schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
schedule_context.avoid(&shard_b.intent.all_pageservers());
schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
schedule_context
}
let schedule_context = make_schedule_context(&shard_a, &shard_b);
let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a_prepare,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
})
);
shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
let schedule_context = make_schedule_context(&shard_a, &shard_b);
let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a_migrate,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(2),
new_attached_node_id: NodeId(1)
})
})
);
shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
let schedule_context = make_schedule_context(&shard_a, &shard_b);
let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a_cleanup,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
})
);
shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
// // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
// assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
shard_a.intent.clear(&mut scheduler);
shard_b.intent.clear(&mut scheduler);
Ok(())
}
#[test]
/// Check that multi-step migration works when moving to somewhere that is only better by
/// 1 AffinityScore -- this ensures that we don't have a bug like the intermediate secondary
/// counting toward the affinity score such that it prevents the rest of the migration from happening.
fn optimize_attachment_marginal() -> anyhow::Result<()> {
let nodes = make_test_nodes(2, &[]);
let mut scheduler = Scheduler::new(nodes.values());
// Multi-sharded tenant, we will craft a situation where affinity
// scores differ only slightly
let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
// 1 attached on node 1
shards[0]
.intent
.set_attached(&mut scheduler, Some(NodeId(1)));
// 2 attached and one secondary on node 2
shards[1]
.intent
.set_attached(&mut scheduler, Some(NodeId(2)));
shards[2]
.intent
.set_attached(&mut scheduler, Some(NodeId(2)));
shards[1].intent.push_secondary(&mut scheduler, NodeId(2));
fn make_schedule_context(shards: &Vec<TenantShard>) -> ScheduleContext {
let mut schedule_context = ScheduleContext::default();
for shard in shards {
schedule_context.avoid(&shard.intent.all_pageservers());
if let Some(attached) = shard.intent.get_attached() {
schedule_context.push_attached(*attached);
}
}
schedule_context
}
let schedule_context = make_schedule_context(&shards);
let optimization_a_prepare =
shards[2].optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a_prepare,
Some(ScheduleOptimization {
sequence: shards[2].sequence,
action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
})
);
shards[2].apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
let schedule_context = make_schedule_context(&shards);
let optimization_a_migrate =
shards[2].optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a_migrate,
Some(ScheduleOptimization {
sequence: shards[2].sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(2),
new_attached_node_id: NodeId(1)
})
})
);
shards[2].apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
// let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
// assert_eq!(
// optimization_a_cleanup,
// Some(ScheduleOptimization {
// sequence: shard_a.sequence,
// action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
// })
// );
// shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
for mut shard in shards {
shard.intent.clear(&mut scheduler);
}
Ok(())
}
#[test]
fn optimize_secondary() -> anyhow::Result<()> {
let nodes = make_test_nodes(4, &[]);
@@ -1865,7 +2247,6 @@ pub(crate) mod tests {
// called repeatedly in the background.
// Returns the applied optimizations
fn optimize_til_idle(
nodes: &HashMap<NodeId, Node>,
scheduler: &mut Scheduler,
shards: &mut [TenantShard],
) -> Vec<ScheduleOptimization> {
@@ -1883,7 +2264,12 @@ pub(crate) mod tests {
}
for shard in shards.iter_mut() {
let optimization = shard.optimize_attachment(nodes, &schedule_context);
let optimization = shard.optimize_attachment(scheduler, &schedule_context);
tracing::info!(
"optimize_attachment({})={:?}",
shard.tenant_shard_id,
optimization
);
if let Some(optimization) = optimization {
optimizations.push(optimization.clone());
shard.apply_optimization(scheduler, optimization);
@@ -1892,6 +2278,11 @@ pub(crate) mod tests {
}
let optimization = shard.optimize_secondary(scheduler, &schedule_context);
tracing::info!(
"optimize_secondary({})={:?}",
shard.tenant_shard_id,
optimization
);
if let Some(optimization) = optimization {
optimizations.push(optimization.clone());
shard.apply_optimization(scheduler, optimization);
@@ -1912,18 +2303,40 @@ pub(crate) mod tests {
optimizations
}
use test_log::test;
/// Test the balancing behavior of shard scheduling: that it achieves a balance, and
/// that it converges.
#[test]
fn optimize_add_nodes() -> anyhow::Result<()> {
let nodes = make_test_nodes(4, &[]);
let nodes = make_test_nodes(
9,
&[
// Initial 6 nodes
AvailabilityZone("az-a".to_string()),
AvailabilityZone("az-a".to_string()),
AvailabilityZone("az-b".to_string()),
AvailabilityZone("az-b".to_string()),
AvailabilityZone("az-c".to_string()),
AvailabilityZone("az-c".to_string()),
// Three we will add later
AvailabilityZone("az-a".to_string()),
AvailabilityZone("az-b".to_string()),
AvailabilityZone("az-c".to_string()),
],
);
// Only show the scheduler a couple of nodes
// Only show the scheduler two nodes in each AZ to start with
let mut scheduler = Scheduler::new([].iter());
scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
for i in 1..=6 {
scheduler.node_upsert(nodes.get(&NodeId(i)).unwrap());
}
let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
let mut shards = make_test_tenant(
PlacementPolicy::Attached(1),
ShardCount::new(4),
Some(AvailabilityZone("az-a".to_string())),
);
let mut schedule_context = ScheduleContext::default();
for shard in &mut shards {
assert!(shard
@@ -1931,30 +2344,50 @@ pub(crate) mod tests {
.is_ok());
}
// We should see equal number of locations on the two nodes.
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
// Initial: attached locations land in the tenant's home AZ.
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
// Add another two nodes: we should see the shards spread out when their optimize
// methods are called
scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
optimize_til_idle(&nodes, &mut scheduler, &mut shards);
// Initial: secondary locations in a remote AZ
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
// Add another three nodes: we should see the shards spread out when their optimize
// methods are called
scheduler.node_upsert(nodes.get(&NodeId(7)).unwrap());
scheduler.node_upsert(nodes.get(&NodeId(8)).unwrap());
scheduler.node_upsert(nodes.get(&NodeId(9)).unwrap());
optimize_til_idle(&mut scheduler, &mut shards);
// We expect one attached location was moved to the new node in the tenant's home AZ
assert_eq!(scheduler.get_node_shard_count(NodeId(7)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(7)), 1);
// The original node has one less attached shard
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
// One of the original nodes still has two attachments, since there are an odd number of nodes
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1);
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1);
// None of our secondaries moved, since we already had enough nodes for those to be
// scheduled perfectly
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
for shard in shards.iter_mut() {
shard.intent.clear(&mut scheduler);
@@ -1994,10 +2427,10 @@ pub(crate) mod tests {
shard.schedule(&mut scheduler, context).unwrap();
}
let applied_to_a = optimize_til_idle(&nodes, &mut scheduler, &mut a);
let applied_to_a = optimize_til_idle(&mut scheduler, &mut a);
assert_eq!(applied_to_a, vec![]);
let applied_to_b = optimize_til_idle(&nodes, &mut scheduler, &mut b);
let applied_to_b = optimize_til_idle(&mut scheduler, &mut b);
assert_eq!(applied_to_b, vec![]);
for shard in a.iter_mut().chain(b.iter_mut()) {

View File

@@ -459,10 +459,12 @@ pub async fn get_timeline_objects(
Ok(list.keys)
}
const MAX_KEYS_PER_DELETE: usize = 1000;
/// Drain a buffer of keys into DeleteObjects requests
///
/// If `drain` is true, drains keys completely; otherwise stops when <
/// `max_keys_per_delete`` keys are left.
/// MAX_KEYS_PER_DELETE keys are left.
/// `num_deleted` returns number of deleted keys.
async fn do_delete(
remote_client: &GenericRemoteStorage,
@@ -472,10 +474,9 @@ async fn do_delete(
progress_tracker: &mut DeletionProgressTracker,
) -> anyhow::Result<()> {
let cancel = CancellationToken::new();
let max_keys_per_delete = remote_client.max_keys_per_delete();
while (!keys.is_empty() && drain) || (keys.len() >= max_keys_per_delete) {
while (!keys.is_empty() && drain) || (keys.len() >= MAX_KEYS_PER_DELETE) {
let request_keys =
keys.split_off(keys.len() - (std::cmp::min(max_keys_per_delete, keys.len())));
keys.split_off(keys.len() - (std::cmp::min(MAX_KEYS_PER_DELETE, keys.len())));
let request_keys: Vec<RemotePath> = request_keys.into_iter().map(|o| o.key).collect();
@@ -616,7 +617,7 @@ pub async fn purge_garbage(
}
objects_to_delete.append(&mut object_list);
if objects_to_delete.len() >= remote_client.max_keys_per_delete() {
if objects_to_delete.len() >= MAX_KEYS_PER_DELETE {
do_delete(
&remote_client,
&mut objects_to_delete,

View File

@@ -268,7 +268,7 @@ impl BucketConfig {
config.bucket_name, config.bucket_region
),
RemoteStorageKind::AzureContainer(config) => format!(
"container {}, storage account {:?}, region {}",
"bucket {}, storage account {:?}, region {}",
config.container_name, config.storage_account, config.container_region
),
}

View File

@@ -86,8 +86,6 @@ enum Command {
/// For safekeeper node_kind only, json list of timelines and their lsn info
#[arg(long, default_value = None)]
timeline_lsns: Option<String>,
#[arg(long, default_value_t = false)]
verbose: bool,
},
TenantSnapshot {
#[arg(long = "tenant-id")]
@@ -168,7 +166,6 @@ async fn main() -> anyhow::Result<()> {
dump_db_connstr,
dump_db_table,
timeline_lsns,
verbose,
} => {
if let NodeKind::Safekeeper = node_kind {
let db_or_list = match (timeline_lsns, dump_db_connstr) {
@@ -206,7 +203,6 @@ async fn main() -> anyhow::Result<()> {
tenant_ids,
json,
post_to_storcon,
verbose,
cli.exit_code,
)
.await
@@ -317,7 +313,6 @@ pub async fn run_cron_job(
Vec::new(),
true,
post_to_storcon,
false, // default to non-verbose mode
exit_code,
)
.await?;
@@ -367,13 +362,12 @@ pub async fn scan_pageserver_metadata_cmd(
tenant_shard_ids: Vec<TenantShardId>,
json: bool,
post_to_storcon: bool,
verbose: bool,
exit_code: bool,
) -> anyhow::Result<()> {
if controller_client.is_none() && post_to_storcon {
return Err(anyhow!("Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"));
}
match scan_pageserver_metadata(bucket_config.clone(), tenant_shard_ids, verbose).await {
match scan_pageserver_metadata(bucket_config.clone(), tenant_shard_ids).await {
Err(e) => {
tracing::error!("Failed: {e}");
Err(e)

View File

@@ -21,12 +21,8 @@ pub struct MetadataSummary {
tenant_count: usize,
timeline_count: usize,
timeline_shard_count: usize,
/// Tenant-shard timeline (key) mapping to errors. The key has to be a string because it will be serialized to a JSON.
/// The key is generated using `TenantShardTimelineId::to_string()`.
with_errors: HashMap<String, Vec<String>>,
/// Tenant-shard timeline (key) mapping to warnings. The key has to be a string because it will be serialized to a JSON.
/// The key is generated using `TenantShardTimelineId::to_string()`.
with_warnings: HashMap<String, Vec<String>>,
with_errors: HashSet<TenantShardTimelineId>,
with_warnings: HashSet<TenantShardTimelineId>,
with_orphans: HashSet<TenantShardTimelineId>,
indices_by_version: HashMap<usize, usize>,
@@ -56,12 +52,7 @@ impl MetadataSummary {
}
}
fn update_analysis(
&mut self,
id: &TenantShardTimelineId,
analysis: &TimelineAnalysis,
verbose: bool,
) {
fn update_analysis(&mut self, id: &TenantShardTimelineId, analysis: &TimelineAnalysis) {
if analysis.is_healthy() {
self.healthy_tenant_shards.insert(id.tenant_shard_id);
} else {
@@ -70,17 +61,11 @@ impl MetadataSummary {
}
if !analysis.errors.is_empty() {
let entry = self.with_errors.entry(id.to_string()).or_default();
if verbose {
entry.extend(analysis.errors.iter().cloned());
}
self.with_errors.insert(*id);
}
if !analysis.warnings.is_empty() {
let entry = self.with_warnings.entry(id.to_string()).or_default();
if verbose {
entry.extend(analysis.warnings.iter().cloned());
}
self.with_warnings.insert(*id);
}
}
@@ -135,7 +120,6 @@ Index versions: {version_summary}
pub async fn scan_pageserver_metadata(
bucket_config: BucketConfig,
tenant_ids: Vec<TenantShardId>,
verbose: bool,
) -> anyhow::Result<MetadataSummary> {
let (remote_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
@@ -180,7 +164,6 @@ pub async fn scan_pageserver_metadata(
mut tenant_objects: TenantObjectListing,
timelines: Vec<(TenantShardTimelineId, RemoteTimelineBlobData)>,
highest_shard_count: ShardCount,
verbose: bool,
) {
summary.tenant_count += 1;
@@ -220,7 +203,7 @@ pub async fn scan_pageserver_metadata(
Some(data),
)
.await;
summary.update_analysis(&ttid, &analysis, verbose);
summary.update_analysis(&ttid, &analysis);
timeline_ids.insert(ttid.timeline_id);
} else {
@@ -288,6 +271,10 @@ pub async fn scan_pageserver_metadata(
summary.update_data(&data);
match tenant_id {
None => {
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
}
Some(prev_tenant_id) => {
if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
// New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
@@ -300,7 +287,6 @@ pub async fn scan_pageserver_metadata(
tenant_objects,
timelines,
highest_shard_count,
verbose,
)
.instrument(info_span!("analyze-tenant", tenant = %prev_tenant_id))
.await;
@@ -310,10 +296,6 @@ pub async fn scan_pageserver_metadata(
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
}
}
None => {
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
}
}
match &data.blob_data {
@@ -344,7 +326,6 @@ pub async fn scan_pageserver_metadata(
tenant_objects,
tenant_timeline_results,
highest_shard_count,
verbose,
)
.instrument(info_span!("analyze-tenant", tenant = %tenant_id))
.await;

View File

@@ -1,21 +0,0 @@
# How to run the `pg_regress` tests on a cloud Neon instance.
* Create a Neon project on staging.
* Grant the superuser privileges to the DB user.
* (Optional) create a branch for testing
* Configure the endpoint by updating the control-plane database with the following settings:
* `Timeone`: `America/Los_Angeles`
* `DateStyle`: `Postgres,MDY`
* `compute_query_id`: `off`
* Checkout the actual `Neon` sources
* Patch the sql and expected files for the specific PostgreSQL version, e.g. for v17:
```bash
$ cd vendor/postgres-v17
$ patch -p1 <../../compute/patches/cloud_regress_pg17.patch
```
* Set the environment variable `BENCHMARK_CONNSTR` to the connection URI of your project.
* Set the environment variable `PG_VERSION` to the version of your project.
* Run
```bash
$ pytest -m remote_cluster -k cloud_regress
```

View File

@@ -5,15 +5,68 @@ Run the regression tests on the cloud instance of Neon
from __future__ import annotations
from pathlib import Path
from typing import Any
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import RemotePostgres
from fixtures.pg_version import PgVersion
@pytest.fixture
def setup(remote_pg: RemotePostgres):
"""
Setup and teardown of the tests
"""
with psycopg2.connect(remote_pg.connstr()) as conn:
with conn.cursor() as cur:
log.info("Creating the extension")
cur.execute("CREATE EXTENSION IF NOT EXISTS regress_so")
conn.commit()
# TODO: Migrate to branches and remove this code
log.info("Looking for subscriptions in the regress database")
cur.execute(
"SELECT subname FROM pg_catalog.pg_subscription WHERE "
"subdbid = (SELECT oid FROM pg_catalog.pg_database WHERE datname='regression');"
)
if cur.rowcount > 0:
with psycopg2.connect(
dbname="regression",
host=remote_pg.default_options["host"],
user=remote_pg.default_options["user"],
password=remote_pg.default_options["password"],
) as regress_conn:
with regress_conn.cursor() as regress_cur:
for sub in cur:
regress_cur.execute(f"ALTER SUBSCRIPTION {sub[0]} DISABLE")
regress_cur.execute(
f"ALTER SUBSCRIPTION {sub[0]} SET (slot_name = NONE)"
)
regress_cur.execute(f"DROP SUBSCRIPTION {sub[0]}")
regress_conn.commit()
yield
# TODO: Migrate to branches and remove this code
log.info("Looking for extra roles...")
with psycopg2.connect(remote_pg.connstr()) as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT rolname FROM pg_catalog.pg_roles WHERE oid > 16384 AND rolname <> 'neondb_owner'"
)
roles: list[Any] = []
for role in cur:
log.info("Role found: %s", role[0])
roles.append(role[0])
for role in roles:
cur.execute(f"DROP ROLE {role}")
conn.commit()
@pytest.mark.timeout(7200)
@pytest.mark.remote_cluster
def test_cloud_regress(
setup,
remote_pg: RemotePostgres,
pg_version: PgVersion,
pg_distrib_dir: Path,

View File

@@ -152,8 +152,6 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
"pageserver_resident_physical_size",
"pageserver_io_operations_bytes_total",
"pageserver_last_record_lsn",
"pageserver_disk_consistent_lsn",
"pageserver_projected_remote_consistent_lsn",
"pageserver_standby_horizon",
"pageserver_smgr_query_seconds_bucket",
"pageserver_smgr_query_seconds_count",
@@ -175,8 +173,6 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
counter("pageserver_tenant_throttling_count_accounted_finish"),
counter("pageserver_tenant_throttling_wait_usecs_sum"),
counter("pageserver_tenant_throttling_count"),
counter("pageserver_timeline_wal_records_received"),
counter("pageserver_page_service_pagestream_flush_in_progress_micros"),
*histogram("pageserver_page_service_batch_size"),
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
# "pageserver_directory_entries_count", -- only used if above a certain threshold

View File

@@ -4556,7 +4556,6 @@ class StorageScrubber:
def __init__(self, env: NeonEnv, log_dir: Path):
self.env = env
self.log_dir = log_dir
self.allowed_errors: list[str] = []
def scrubber_cli(
self, args: list[str], timeout, extra_env: dict[str, str] | None = None
@@ -4634,70 +4633,19 @@ class StorageScrubber:
if timeline_lsns is not None:
args.append("--timeline-lsns")
args.append(json.dumps(timeline_lsns))
if node_kind == NodeKind.PAGESERVER:
args.append("--verbose")
stdout = self.scrubber_cli(args, timeout=30, extra_env=extra_env)
try:
summary = json.loads(stdout)
healthy = self._check_run_healthy(summary)
# summary does not contain "with_warnings" if node_kind is the safekeeper
no_warnings = "with_warnings" not in summary or not summary["with_warnings"]
healthy = not summary["with_errors"] and no_warnings
return healthy, summary
except:
log.error("Failed to decode JSON output from `scan-metadata`. Dumping stdout:")
log.error(stdout)
raise
def _check_line_allowed(self, line: str) -> bool:
for a in self.allowed_errors:
try:
if re.match(a, line):
return True
except re.error:
log.error(f"Invalid regex: '{a}'")
raise
return False
def _check_line_list_allowed(self, lines: list[str]) -> bool:
for line in lines:
if not self._check_line_allowed(line):
return False
return True
def _check_run_healthy(self, summary: dict[str, Any]) -> bool:
# summary does not contain "with_warnings" if node_kind is the safekeeper
healthy = True
with_warnings = summary.get("with_warnings", None)
if with_warnings is not None:
if isinstance(with_warnings, list):
if len(with_warnings) > 0:
# safekeeper scan_metadata output is a list of tenants
healthy = False
else:
for _, warnings in with_warnings.items():
assert (
len(warnings) > 0
), "with_warnings value should not be empty, running without verbose mode?"
if not self._check_line_list_allowed(warnings):
healthy = False
break
if not healthy:
return healthy
with_errors = summary.get("with_errors", None)
if with_errors is not None:
if isinstance(with_errors, list):
if len(with_errors) > 0:
# safekeeper scan_metadata output is a list of tenants
healthy = False
else:
for _, errors in with_errors.items():
assert (
len(errors) > 0
), "with_errors value should not be empty, running without verbose mode?"
if not self._check_line_list_allowed(errors):
healthy = False
break
return healthy
def tenant_snapshot(self, tenant_id: TenantId, output_path: Path):
stdout = self.scrubber_cli(
["tenant-snapshot", "--tenant-id", str(tenant_id), "--output-path", str(output_path)],

View File

@@ -850,7 +850,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
force_repartition=False,
force_image_layer_creation=False,
force_l0_compaction=False,
wait_until_flushed=True,
wait_until_uploaded=False,
compact: bool | None = None,
**kwargs,
@@ -863,8 +862,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
query["force_image_layer_creation"] = "true"
if force_l0_compaction:
query["force_l0_compaction"] = "true"
if not wait_until_flushed:
query["wait_until_flushed"] = "false"
if wait_until_uploaded:
query["wait_until_uploaded"] = "true"
@@ -872,7 +869,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
query["compact"] = "true" if compact else "false"
log.info(
f"Requesting checkpoint: tenant={tenant_id} timeline={timeline_id} wait_until_flushed={wait_until_flushed} wait_until_uploaded={wait_until_uploaded} compact={compact}"
f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}, wait_until_uploaded={wait_until_uploaded}"
)
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",

View File

@@ -54,15 +54,23 @@ def wait_for_upload(
tenant: TenantId | TenantShardId,
timeline: TimelineId,
lsn: Lsn,
timeout=20,
):
"""Waits for local timeline upload up to specified LSN"""
"""waits for local timeline upload up to specified lsn"""
def is_uploaded():
remote_lsn = remote_consistent_lsn(pageserver_http, tenant, timeline)
assert remote_lsn >= lsn, f"remote_consistent_lsn at {remote_lsn}"
wait_until(is_uploaded, name=f"upload to {lsn}", timeout=timeout)
current_lsn = Lsn(0)
for i in range(20):
current_lsn = remote_consistent_lsn(pageserver_http, tenant, timeline)
if current_lsn >= lsn:
log.info("wait finished")
return
lr_lsn = last_record_lsn(pageserver_http, tenant, timeline)
log.info(
f"waiting for remote_consistent_lsn to reach {lsn}, now {current_lsn}, last_record_lsn={lr_lsn}, iteration {i + 1}"
)
time.sleep(1)
raise Exception(
f"timed out while waiting for {tenant}/{timeline} remote_consistent_lsn to reach {lsn}, was {current_lsn}"
)
def _tenant_in_expected_state(tenant_info: dict[str, Any], expected_state: str):

View File

@@ -1,142 +0,0 @@
from __future__ import annotations
import random
from concurrent.futures import ThreadPoolExecutor
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
wait_for_upload,
wait_for_upload_queue_empty,
)
from fixtures.remote_storage import s3_storage
@pytest.mark.timeout(900)
@pytest.mark.parametrize("size", [8, 1024, 8192])
@pytest.mark.parametrize("s3", [True, False], ids=["s3", "local"])
@pytest.mark.parametrize("backpressure", [True, False], ids=["backpressure", "nobackpressure"])
@pytest.mark.parametrize("fsync", [True, False], ids=["fsync", "nofsync"])
def test_ingest_insert_bulk(
request: pytest.FixtureRequest,
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
fsync: bool,
backpressure: bool,
s3: bool,
size: int,
):
"""
Benchmarks ingestion of 5 GB of sequential insert WAL. Measures ingestion and S3 upload
separately. Also does a Safekeeper→Pageserver re-ingestion to measure Pageserver ingestion in
isolation.
"""
CONCURRENCY = 1 # 1 is optimal without fsync or backpressure
VOLUME = 5 * 1024**3
rows = VOLUME // (size + 64) # +64 roughly accounts for per-row WAL overhead
neon_env_builder.safekeepers_enable_fsync = fsync
if s3:
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
# NB: don't use S3 for Safekeeper. It doesn't affect throughput (no backpressure), but it
# would compete with Pageserver for bandwidth.
# neon_env_builder.enable_safekeeper_remote_storage(s3_storage())
neon_env_builder.disable_scrub_on_exit() # immediate shutdown may leave stray layers
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start(
"main",
config_lines=[
f"fsync = {fsync}",
"max_replication_apply_lag = 0",
f"max_replication_flush_lag = {'10GB' if backpressure else '0'}",
# NB: neon_local defaults to 15MB, which is too slow -- production uses 500MB.
f"max_replication_write_lag = {'500MB' if backpressure else '0'}",
],
)
endpoint.safe_psql("create extension neon")
# Wait for the timeline to be propagated to the pageserver.
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
# Ingest rows.
log.info("Ingesting data")
start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
def insert_rows(endpoint, table, count, value):
with endpoint.connect().cursor() as cur:
cur.execute("set statement_timeout = 0")
cur.execute(f"create table {table} (id int, data bytea)")
cur.execute(f"insert into {table} values (generate_series(1, {count}), %s)", (value,))
with zenbenchmark.record_duration("upload"):
with zenbenchmark.record_duration("ingest"):
with ThreadPoolExecutor(max_workers=CONCURRENCY) as pool:
for i in range(CONCURRENCY):
# Write a random value for all rows. This is sufficient to prevent compression,
# e.g. in TOAST. Randomly generating every row is too slow.
value = random.randbytes(size)
worker_rows = rows / CONCURRENCY
pool.submit(insert_rows, endpoint, f"table{i}", worker_rows, value)
end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
# Wait for pageserver to ingest the WAL.
client = env.pageserver.http_client()
wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn)
# Wait for pageserver S3 upload. Checkpoint to flush the last in-memory layer.
client.timeline_checkpoint(
env.initial_tenant,
env.initial_timeline,
compact=False,
wait_until_flushed=False,
)
wait_for_upload(client, env.initial_tenant, env.initial_timeline, end_lsn, timeout=600)
# Empty out upload queue for next benchmark.
wait_for_upload_queue_empty(client, env.initial_tenant, env.initial_timeline)
backpressure_time = endpoint.safe_psql("select backpressure_throttling_time()")[0][0]
# Now that all data is ingested, delete and recreate the tenant in the pageserver. This will
# reingest all the WAL directly from the safekeeper. This gives us a baseline of how fast the
# pageserver can ingest this WAL in isolation.
status = env.storage_controller.inspect(tenant_shard_id=env.initial_tenant)
assert status is not None
endpoint.stop() # avoid spurious getpage errors
client.tenant_delete(env.initial_tenant)
env.pageserver.tenant_create(tenant_id=env.initial_tenant, generation=status[0])
with zenbenchmark.record_duration("recover"):
log.info("Recovering WAL into pageserver")
client.timeline_create(env.pg_version, env.initial_tenant, env.initial_timeline)
wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn)
# Emit metrics.
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))
zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM)
zenbenchmark.record("row_count", rows, "rows", MetricReport.TEST_PARAM)
zenbenchmark.record("concurrency", CONCURRENCY, "clients", MetricReport.TEST_PARAM)
zenbenchmark.record(
"backpressure_time", backpressure_time // 1000, "ms", MetricReport.LOWER_IS_BETTER
)
props = {p["name"]: p["value"] for _, p in request.node.user_properties}
for name in ("ingest", "upload", "recover"):
throughput = int(wal_written_mb / props[name])
zenbenchmark.record(f"{name}_throughput", throughput, "MB/s", MetricReport.HIGHER_IS_BETTER)
# Pageserver shutdown will likely get stuck on the upload queue, just shut it down immediately.
env.stop(immediate=True)

View File

@@ -86,7 +86,7 @@ def test_storage_controller_many_tenants(
AZS = ["alpha", "bravo", "charlie"]
neon_env_builder.pageserver_config_override = lambda ps_cfg: ps_cfg.update(
{"availability_zone": f"az-{AZS[ps_cfg['id'] % len(AZS)]}"}
{"availability_zone": f"az-{AZS[(ps_cfg['id'] - 1) % len(AZS)]}"}
)
# A small sleep on each call into the notify hook, to simulate the latency of doing a database write
@@ -114,8 +114,8 @@ def test_storage_controller_many_tenants(
ps.allowed_errors.append(".*request was dropped before completing.*")
# Total tenants
small_tenant_count = 7800
large_tenant_count = 200
small_tenant_count = 780
large_tenant_count = 20
tenant_count = small_tenant_count + large_tenant_count
large_tenant_shard_count = 8
total_shards = small_tenant_count + large_tenant_count * large_tenant_shard_count
@@ -141,7 +141,7 @@ def test_storage_controller_many_tenants(
# We will create timelines in only a subset of tenants, because creating timelines
# does many megabytes of IO, and we want to densely simulate huge tenant counts on
# a single test node.
tenant_timelines_count = 100
tenant_timelines_count = 10
# These lists are maintained for use with rng.choice
tenants_with_timelines = list(rng.sample(list(tenants.keys()), tenant_timelines_count))
@@ -380,7 +380,7 @@ def test_storage_controller_many_tenants(
shard_counts = get_consistent_node_shard_counts(env, total_shards)
log.info(f"Shard counts before rolling restart: {shard_counts}")
assert_consistent_balanced_attachments(env, total_shards)
# assert_consistent_balanced_attachments(env, total_shards)
# Restart pageservers gracefully: this exercises the /re-attach pageserver API
# and the storage controller drain and fill API
@@ -445,7 +445,7 @@ def test_storage_controller_many_tenants(
shard_counts = get_consistent_node_shard_counts(env, total_shards)
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
assert_consistent_balanced_attachments(env, total_shards)
# assert_consistent_balanced_attachments(env, total_shards)
env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120)
env.storage_controller.consistency_check()

View File

@@ -15,7 +15,7 @@ from fixtures.pageserver.http import PageserverApiException
from fixtures.utils import skip_in_debug_build, wait_until
from fixtures.workload import Workload
AGGRESSIVE_COMPACTION_TENANT_CONF = {
AGGRESIVE_COMPACTION_TENANT_CONF = {
# Disable gc and compaction. The test runs compaction manually.
"gc_period": "0s",
"compaction_period": "0s",
@@ -24,7 +24,6 @@ AGGRESSIVE_COMPACTION_TENANT_CONF = {
# Compact small layers
"compaction_target_size": 1024**2,
"image_creation_threshold": 2,
# "lsn_lease_length": "0s", -- TODO: would cause branch creation errors, should fix later
}
@@ -52,7 +51,7 @@ def test_pageserver_compaction_smoke(
page_cache_size=10
"""
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESSIVE_COMPACTION_TENANT_CONF)
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
@@ -121,25 +120,14 @@ page_cache_size=10
assert vectored_average < 8
@skip_in_debug_build("only run with release build")
def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
SMOKE_CONF = {
# Run both gc and gc-compaction.
"gc_period": "5s",
"compaction_period": "5s",
# No PiTR interval and small GC horizon
"pitr_interval": "0s",
"gc_horizon": f"{1024 ** 2}",
"lsn_lease_length": "0s",
}
env = neon_env_builder.init_start(initial_tenant_conf=SMOKE_CONF)
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
row_count = 10000
churn_rounds = 50
row_count = 1000
churn_rounds = 10
ps_http = env.pageserver.http_client()
@@ -153,35 +141,23 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
if i % 10 == 0:
log.info(f"Running churn round {i}/{churn_rounds} ...")
# Run gc-compaction every 10 rounds to ensure the test doesn't take too long time.
ps_http.timeline_compact(
tenant_id,
timeline_id,
enhanced_gc_bottom_most_compaction=True,
body={
"scheduled": True,
"sub_compaction": True,
"compact_range": {
"start": "000000000000000000000000000000000000",
"end": "030000000000000000000000000000000000",
},
},
)
workload.churn_rows(row_count, env.pageserver.id)
# ensure gc_compaction is scheduled and it's actually running (instead of skipping due to no layers picked)
env.pageserver.assert_log_contains(
"scheduled_compact_timeline.*picked .* layers for compaction"
)
# Force L0 compaction to ensure the number of layers is within bounds, so that gc-compaction can run.
ps_http.timeline_compact(tenant_id, timeline_id, force_l0_compaction=True)
assert ps_http.perf_info(tenant_id, timeline_id)[0]["num_of_l0"] <= 1
ps_http.timeline_compact(
tenant_id,
timeline_id,
enhanced_gc_bottom_most_compaction=True,
body={
"start": "000000000000000000000000000000000000",
"end": "030000000000000000000000000000000000",
},
)
log.info("Validating at workload end ...")
workload.validate(env.pageserver.id)
# Run a legacy compaction+gc to ensure gc-compaction can coexist with legacy compaction.
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)
ps_http.timeline_gc(tenant_id, timeline_id, None)
# Stripe sizes in number of pages.
TINY_STRIPES = 16

View File

@@ -215,7 +215,7 @@ if SQL_EXPORTER is None:
#
# The "host" network mode allows sql_exporter to talk to the
# endpoint which is running on the host.
super().__init__("docker.io/burningalchemist/sql_exporter:0.16.0", network_mode="host")
super().__init__("docker.io/burningalchemist/sql_exporter:0.13.1", network_mode="host")
self.__logs_dir = logs_dir
self.__port = port

View File

@@ -1,124 +0,0 @@
import threading
import time
from fixtures.neon_fixtures import NeonEnv
BTREE_NUM_CYCLEID_PAGES = """
WITH raw_pages AS (
SELECT blkno, get_raw_page_at_lsn('t_uidx', 'main', blkno, NULL, NULL) page
FROM generate_series(1, pg_relation_size('t_uidx'::regclass) / 8192) blkno
),
parsed_pages AS (
/* cycle ID is the last 2 bytes of the btree page */
SELECT blkno, SUBSTRING(page FROM 8191 FOR 2) as cycle_id
FROM raw_pages
)
SELECT count(*),
encode(cycle_id, 'hex')
FROM parsed_pages
WHERE encode(cycle_id, 'hex') != '0000'
GROUP BY encode(cycle_id, 'hex');
"""
def test_nbtree_pagesplit_cycleid(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
ses1 = endpoint.connect().cursor()
ses1.execute("ALTER SYSTEM SET autovacuum = off;")
ses1.execute("ALTER SYSTEM SET enable_seqscan = off;")
ses1.execute("ALTER SYSTEM SET full_page_writes = off;")
ses1.execute("SELECT pg_reload_conf();")
ses1.execute("CREATE EXTENSION neon_test_utils;")
# prepare a large index
ses1.execute("CREATE TABLE t(id integer GENERATED ALWAYS AS IDENTITY, txt text);")
ses1.execute("CREATE UNIQUE INDEX t_uidx ON t(id);")
ses1.execute("INSERT INTO t (txt) SELECT i::text FROM generate_series(1, 2035) i;")
ses1.execute("SELECT neon_xlogflush();")
ses1.execute(BTREE_NUM_CYCLEID_PAGES)
pages = ses1.fetchall()
assert (
len(pages) == 0
), f"0 back splits with cycle ID expected, real {len(pages)} first {pages[0]}"
# Delete enough tuples to clear the first index page.
# (there are up to 407 rows per 8KiB page; 406 for non-rightmost leafs.
ses1.execute("DELETE FROM t WHERE id <= 406;")
# Make sure the page is cleaned up
ses1.execute("VACUUM (FREEZE, INDEX_CLEANUP ON) t;")
# Do another delete-then-indexcleanup cycle, to move the pages from
# "dead" to "reusable"
ses1.execute("DELETE FROM t WHERE id <= 446;")
ses1.execute("VACUUM (FREEZE, INDEX_CLEANUP ON) t;")
# Make sure the vacuum we're about to trigger in s3 has cleanup work to do
ses1.execute("DELETE FROM t WHERE id <= 610;")
# Flush wal, for checking purposes
ses1.execute("SELECT neon_xlogflush();")
ses1.execute(BTREE_NUM_CYCLEID_PAGES)
pages = ses1.fetchall()
assert len(pages) == 0, f"No back splits with cycle ID expected, got batches of {pages} instead"
ses2 = endpoint.connect().cursor()
ses3 = endpoint.connect().cursor()
# Session 2 pins a btree page, which prevents vacuum from processing that
# page, thus allowing us to reliably split pages while a concurrent vacuum
# is running.
ses2.execute("BEGIN;")
ses2.execute(
"DECLARE foo NO SCROLL CURSOR FOR SELECT row_number() over () FROM t ORDER BY id ASC"
)
ses2.execute("FETCH FROM foo;") # pins the leaf page with id 611
wait_evt = threading.Event()
# Session 3 runs the VACUUM command. Note that this will block, and
# therefore must run on another thread.
# We rely on this running quickly enough to hit the pinned page from
# session 2 by the time we start other work again in session 1, but
# technically there is a race where the thread (and/or PostgreSQL process)
# don't get to that pinned page with vacuum until >2s after evt.set() was
# called, and session 1 thus might already have split pages.
def vacuum_freeze_t(ses3, evt: threading.Event):
# Begin parallel vacuum that should hit the index
evt.set()
# this'll hang until s2 fetches enough new data from its cursor.
# this is technically a race with the time.sleep(2) below, but if this
# command doesn't hit
ses3.execute("VACUUM (FREEZE, INDEX_CLEANUP on, DISABLE_PAGE_SKIPPING on) t;")
ses3t = threading.Thread(target=vacuum_freeze_t, args=(ses3, wait_evt))
ses3t.start()
wait_evt.wait()
# Make extra sure we got the thread started and vacuum is stuck, by waiting
# some time even after wait_evt got set. This isn't truly reliable (it is
# possible
time.sleep(2)
# Insert 2 pages worth of new data.
# This should reuse the one empty page, plus another page at the end of
# the index relation; with split ordering
# old_blk -> blkno=1 -> old_blk + 1.
# As this is run while vacuum in session 3 is happening, these splits
# should receive cycle IDs where applicable.
ses1.execute("INSERT INTO t (txt) SELECT i::text FROM generate_series(1, 812) i;")
# unpin the btree page, allowing s3's vacuum to complete
ses2.execute("FETCH ALL FROM foo;")
ses2.execute("ROLLBACK;")
# flush WAL to make sure PS is up-to-date
ses1.execute("SELECT neon_xlogflush();")
# check that our expectations are correct
ses1.execute(BTREE_NUM_CYCLEID_PAGES)
pages = ses1.fetchall()
assert (
len(pages) == 1 and pages[0][0] == 3
), f"3 page splits with cycle ID expected; actual {pages}"
# final cleanup
ses3t.join()
ses1.close()
ses2.close()
ses3.close()

View File

@@ -516,14 +516,18 @@ def test_sharding_split_smoke(
shard_count = 2
# Shard count we split into
split_shard_count = 4
# We will have 2 shards per pageserver once done (including secondaries)
neon_env_builder.num_pageservers = split_shard_count
# In preferred AZ & other AZ we will end up with one shard per pageserver
neon_env_builder.num_pageservers = split_shard_count * 2
# Two AZs
def assign_az(ps_cfg):
az = f"az-{(ps_cfg['id'] - 1) % 2}"
ps_cfg["availability_zone"] = az
# We will run more pageservers than tests usually do, so give them tiny page caches
# in case we're on a test node under memory pressure.
ps_cfg["page_cache_size"] = 128
neon_env_builder.pageserver_config_override = assign_az
# 1MiB stripes: enable getting some meaningful data distribution without
@@ -659,8 +663,8 @@ def test_sharding_split_smoke(
# - shard_count reconciles for the original setup of the tenant
# - shard_count reconciles for detaching the original secondary locations during split
# - split_shard_count reconciles during shard splitting, for setting up secondaries.
# - split_shard_count/2 of the child shards will need to fail over to their secondaries (since we have 8 shards and 4 pageservers, only 4 will move)
expect_reconciles = shard_count * 2 + split_shard_count + split_shard_count / 2
# - split_shard_count/2 reconciles to migrate shards to their temporary secondaries
expect_reconciles = shard_count * 2 + split_shard_count + 3 * (split_shard_count / 2)
reconcile_ok = env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
@@ -725,10 +729,14 @@ def test_sharding_split_smoke(
# dominated by shard count.
log.info(f"total: {total}")
assert total == {
1: 2,
2: 2,
3: 2,
4: 2,
1: 1,
2: 1,
3: 1,
4: 1,
5: 1,
6: 1,
7: 1,
8: 1,
}
# The controller is not required to lay out the attached locations in any particular way, but

View File

@@ -3011,11 +3011,12 @@ def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder):
def assign_az(ps_cfg):
az = f"az-{ps_cfg['id']}"
az = f"az-{ps_cfg['id'] % 2}"
log.info("Assigned AZ {az}")
ps_cfg["availability_zone"] = az
neon_env_builder.pageserver_config_override = assign_az
neon_env_builder.num_pageservers = 2
neon_env_builder.num_pageservers = 4
env = neon_env_builder.init_configs()
env.start()
@@ -3030,8 +3031,14 @@ def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder):
assert shards[0]["preferred_az_id"] == expected_az
# When all other schedule scoring parameters are equal, tenants should round-robin on AZs
assert env.storage_controller.tenant_describe(tids[0])["shards"][0]["preferred_az_id"] == "az-1"
assert env.storage_controller.tenant_describe(tids[1])["shards"][0]["preferred_az_id"] == "az-0"
assert env.storage_controller.tenant_describe(tids[2])["shards"][0]["preferred_az_id"] == "az-1"
# Try modifying preferred AZ
updated = env.storage_controller.set_preferred_azs(
{TenantShardId(tid, 0, 0): "foo" for tid in tids}
{TenantShardId(tid, 0, 0): "az-0" for tid in tids}
)
assert set(updated) == set([TenantShardId(tid, 0, 0) for tid in tids])
@@ -3039,29 +3046,24 @@ def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder):
for tid in tids:
shards = env.storage_controller.tenant_describe(tid)["shards"]
assert len(shards) == 1
assert shards[0]["preferred_az_id"] == "foo"
assert shards[0]["preferred_az_id"] == "az-0"
# Generate a layer to avoid shard split handling on ps from tripping
# up on debug assert.
timeline_id = TimelineId.generate()
env.create_timeline("bar", tids[0], timeline_id)
workload = Workload(env, tids[0], timeline_id, branch_name="bar")
workload.init()
workload.write_rows(256)
workload.validate()
# Having modified preferred AZ, we should get moved there
env.storage_controller.reconcile_until_idle(max_interval=0.1)
for tid in tids:
shard = env.storage_controller.tenant_describe(tid)["shards"][0]
attached_to = shard["node_attached"]
attached_in_az = env.get_pageserver(attached_to).az_id
assert shard["preferred_az_id"] == attached_in_az == "az-0"
env.storage_controller.tenant_shard_split(tids[0], shard_count=2)
env.storage_controller.reconcile_until_idle(max_interval=0.1)
shards = env.storage_controller.tenant_describe(tids[0])["shards"]
assert len(shards) == 2
for shard in shards:
attached_to = shard["node_attached"]
expected_az = env.get_pageserver(attached_to).az_id
# The scheduling optimization logic is not yet AZ-aware, so doesn't succeed
# in putting the tenant shards in the preferred AZ.
# To be fixed in https://github.com/neondatabase/neon/pull/9916
# assert shard["preferred_az_id"] == expected_az
attached_in_az = env.get_pageserver(attached_to).az_id
assert shard["preferred_az_id"] == attached_in_az == "az-0"
@run_only_on_default_postgres("Postgres version makes no difference here")
@@ -3230,55 +3232,3 @@ def test_multi_attached_timeline_creation(neon_env_builder: NeonEnvBuilder, migr
# Always disable 'pause' failpoints, even on failure, to avoid hanging in shutdown
env.storage_controller.configure_failpoints((migration_failpoint.value, "off"))
raise
@run_only_on_default_postgres("Postgres version makes no difference here")
def test_storage_controller_detached_stopped(
neon_env_builder: NeonEnvBuilder,
):
"""
Test that detaching a tenant while it has scheduling policy set to Paused or Stop works
"""
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
neon_env_builder.num_pageservers = 1
env = neon_env_builder.init_configs()
env.start()
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
tenant_id = TenantId.generate()
env.storage_controller.tenant_create(
tenant_id,
shard_count=1,
)
assert len(env.pageserver.http_client().tenant_list_locations()["tenant_shards"]) == 1
# Disable scheduling: ordinarily this would prevent the tenant's configuration being
# reconciled to pageservers, but this should be overridden when detaching.
env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy.*")
env.storage_controller.tenant_policy_update(
tenant_id,
{"scheduling": "Stop"},
)
env.storage_controller.consistency_check()
# Detach the tenant
virtual_ps_http.tenant_location_conf(
tenant_id,
{
"mode": "Detached",
"secondary_conf": None,
"tenant_conf": {},
"generation": None,
},
)
env.storage_controller.consistency_check()
# Confirm the detach happened
assert env.pageserver.http_client().tenant_list_locations()["tenant_shards"] == []

View File

@@ -572,10 +572,4 @@ def test_scrubber_scan_pageserver_metadata(
unhealthy = env.storage_controller.metadata_health_list_unhealthy()["unhealthy_tenant_shards"]
assert len(unhealthy) == 1 and unhealthy[0] == str(tenant_shard_id)
healthy, _ = env.storage_scrubber.scan_metadata()
assert not healthy
env.storage_scrubber.allowed_errors.append(".*not present in remote storage.*")
healthy, _ = env.storage_scrubber.scan_metadata()
assert healthy
neon_env_builder.disable_scrub_on_exit() # We already ran scrubber, no need to do an extra run
neon_env_builder.disable_scrub_on_exit()

View File

@@ -4,7 +4,7 @@ import time
from contextlib import closing
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin, fork_at_current_lsn
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, fork_at_current_lsn
from fixtures.utils import query_scalar
@@ -292,76 +292,3 @@ def test_vm_bit_clear_on_heap_lock_blackbox(neon_env_builder: NeonEnvBuilder):
tup = cur.fetchall()
log.info(f"tuple = {tup}")
cur.execute("commit transaction")
def test_check_visibility_map(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
"""
Runs pgbench across a few databases on a sharded tenant, then performs a visibility map
consistency check. Regression test for https://github.com/neondatabase/neon/issues/9914.
"""
# Use a large number of shards with small stripe sizes, to ensure the visibility
# map will end up on non-zero shards.
SHARD_COUNT = 8
STRIPE_SIZE = 32 # in 8KB pages
PGBENCH_RUNS = 4
env = neon_env_builder.init_start(
initial_tenant_shard_count=SHARD_COUNT, initial_tenant_shard_stripe_size=STRIPE_SIZE
)
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"shared_buffers = 64MB",
],
)
# Run pgbench in 4 different databases, to exercise different shards.
dbnames = [f"pgbench{i}" for i in range(PGBENCH_RUNS)]
for i, dbname in enumerate(dbnames):
log.info(f"pgbench run {i+1}/{PGBENCH_RUNS}")
endpoint.safe_psql(f"create database {dbname}")
connstr = endpoint.connstr(dbname=dbname)
# pgbench -i will automatically vacuum the tables. This creates the visibility map.
pg_bin.run(["pgbench", "-i", "-s", "10", connstr])
# Freeze the tuples to set the initial frozen bit.
endpoint.safe_psql("vacuum freeze", dbname=dbname)
# Run pgbench.
pg_bin.run(["pgbench", "-c", "32", "-j", "8", "-T", "10", connstr])
# Restart the endpoint to flush the compute page cache. We want to make sure we read VM pages
# from storage, not cache.
endpoint.stop()
endpoint.start()
# Check that the visibility map matches the heap contents for pg_accounts (the main table).
for dbname in dbnames:
log.info(f"Checking visibility map for {dbname}")
with endpoint.cursor(dbname=dbname) as cur:
cur.execute("create extension pg_visibility")
cur.execute("select count(*) from pg_check_visible('pgbench_accounts')")
row = cur.fetchone()
assert row is not None
assert row[0] == 0, f"{row[0]} inconsistent VM pages (visible)"
cur.execute("select count(*) from pg_check_frozen('pgbench_accounts')")
row = cur.fetchone()
assert row is not None
assert row[0] == 0, f"{row[0]} inconsistent VM pages (frozen)"
# Vacuum and freeze the tables, and check that the visibility map is still accurate.
for dbname in dbnames:
log.info(f"Vacuuming and checking visibility map for {dbname}")
with endpoint.cursor(dbname=dbname) as cur:
cur.execute("vacuum freeze")
cur.execute("select count(*) from pg_check_visible('pgbench_accounts')")
row = cur.fetchone()
assert row is not None
assert row[0] == 0, f"{row[0]} inconsistent VM pages (visible)"
cur.execute("select count(*) from pg_check_frozen('pgbench_accounts')")
row = cur.fetchone()
assert row is not None
assert row[0] == 0, f"{row[0]} inconsistent VM pages (frozen)"

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.2",
"01fa3c48664ca030cfb69bb4a350aa9df4691d88"
"a10d95be67265e0f10a422ba0457f5a7af01de71"
],
"v16": [
"16.6",
"81428621f7c04aed03671cf80a928e0a36d92505"
"dff6615a8e48a10bb17a03fa3c00635f1ace7a92"
],
"v15": [
"15.10",
"8736b10c1d93d11b9c0489872dd529c4c0f5338f"
"972e325e62b455957adbbdd8580e31275bb5b8c9"
],
"v14": [
"14.15",
"13ff324150fceaac72920e01742addc053db9462"
"373f9decad933d2d46f321231032ae8b0da81acd"
]
}

View File

@@ -33,7 +33,6 @@ deranged = { version = "0.3", default-features = false, features = ["powerfmt",
digest = { version = "0.10", features = ["mac", "oid", "std"] }
either = { version = "1" }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
form_urlencoded = { version = "1" }
futures-channel = { version = "0.3", features = ["sink"] }
futures-executor = { version = "0.3" }
futures-io = { version = "0.3" }
@@ -79,7 +78,6 @@ sha2 = { version = "0.10", features = ["asm", "oid"] }
signature = { version = "2", default-features = false, features = ["digest", "rand_core", "std"] }
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
spki = { version = "0.7", default-features = false, features = ["pem", "std"] }
stable_deref_trait = { version = "1" }
subtle = { version = "2" }
sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] }
tikv-jemalloc-ctl = { version = "0.6", features = ["stats", "use_std"] }
@@ -107,7 +105,6 @@ anyhow = { version = "1", features = ["backtrace"] }
bytes = { version = "1", features = ["serde"] }
cc = { version = "1", default-features = false, features = ["parallel"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
displaydoc = { version = "0.2" }
either = { version = "1" }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
half = { version = "2", default-features = false, features = ["num-traits"] }