mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-12 07:00:36 +00:00
Compare commits
2 Commits
knz/vpc_ca
...
heikki/mit
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
328408b925 | ||
|
|
a98fab8b1c |
2
.github/actionlint.yml
vendored
2
.github/actionlint.yml
vendored
@@ -21,5 +21,3 @@ config-variables:
|
||||
- SLACK_UPCOMING_RELEASE_CHANNEL_ID
|
||||
- DEV_AWS_OIDC_ROLE_ARN
|
||||
- BENCHMARK_INGEST_TARGET_PROJECTID
|
||||
- PGREGRESS_PG16_PROJECT_ID
|
||||
- PGREGRESS_PG17_PROJECT_ID
|
||||
|
||||
32
.github/workflows/cloud-regress.yml
vendored
32
.github/workflows/cloud-regress.yml
vendored
@@ -23,14 +23,11 @@ jobs:
|
||||
regress:
|
||||
env:
|
||||
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
|
||||
DEFAULT_PG_VERSION: 16
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
BUILD_TYPE: remote
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
pg-version: [16, 17]
|
||||
|
||||
runs-on: us-east-2
|
||||
container:
|
||||
@@ -43,11 +40,9 @@ jobs:
|
||||
submodules: true
|
||||
|
||||
- name: Patch the test
|
||||
env:
|
||||
PG_VERSION: ${{matrix.pg-version}}
|
||||
run: |
|
||||
cd "vendor/postgres-v${PG_VERSION}"
|
||||
patch -p1 < "../../compute/patches/cloud_regress_pg${PG_VERSION}.patch"
|
||||
cd "vendor/postgres-v${DEFAULT_PG_VERSION}"
|
||||
patch -p1 < "../../compute/patches/cloud_regress_pg${DEFAULT_PG_VERSION}.patch"
|
||||
|
||||
- name: Generate a random password
|
||||
id: pwgen
|
||||
@@ -60,9 +55,8 @@ jobs:
|
||||
- name: Change tests according to the generated password
|
||||
env:
|
||||
DBPASS: ${{ steps.pwgen.outputs.DBPASS }}
|
||||
PG_VERSION: ${{matrix.pg-version}}
|
||||
run: |
|
||||
cd vendor/postgres-v"${PG_VERSION}"/src/test/regress
|
||||
cd vendor/postgres-v"${DEFAULT_PG_VERSION}"/src/test/regress
|
||||
for fname in sql/*.sql expected/*.out; do
|
||||
sed -i.bak s/NEON_PASSWORD_PLACEHOLDER/"'${DBPASS}'"/ "${fname}"
|
||||
done
|
||||
@@ -79,29 +73,15 @@ jobs:
|
||||
path: /tmp/neon/
|
||||
prefix: latest
|
||||
|
||||
- name: Create a new branch
|
||||
id: create-branch
|
||||
uses: ./.github/actions/neon-branch-create
|
||||
with:
|
||||
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
project_id: ${{ vars[format('PGREGRESS_PG{0}_PROJECT_ID', matrix.pg-version)] }}
|
||||
|
||||
- name: Run the regression tests
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
test_selection: cloud_regress
|
||||
pg_version: ${{matrix.pg-version}}
|
||||
pg_version: ${{ env.DEFAULT_PG_VERSION }}
|
||||
extra_params: -m remote_cluster
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{steps.create-branch.outputs.dsn}}
|
||||
|
||||
- name: Delete branch
|
||||
uses: ./.github/actions/neon-branch-delete
|
||||
with:
|
||||
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
project_id: ${{ vars[format('PGREGRESS_PG{0}_PROJECT_ID', matrix.pg-version)] }}
|
||||
branch_id: ${{steps.create-branch.outputs.branch_id}}
|
||||
BENCHMARK_CONNSTR: ${{ secrets.PG_REGRESS_CONNSTR }}
|
||||
|
||||
- name: Create Allure report
|
||||
id: create-allure-report
|
||||
|
||||
33
CODEOWNERS
33
CODEOWNERS
@@ -1,29 +1,16 @@
|
||||
# Autoscaling
|
||||
/libs/vm_monitor/ @neondatabase/autoscaling
|
||||
|
||||
# DevProd
|
||||
/.github/ @neondatabase/developer-productivity
|
||||
|
||||
# Compute
|
||||
/pgxn/ @neondatabase/compute
|
||||
/vendor/ @neondatabase/compute
|
||||
/compute/ @neondatabase/compute
|
||||
/compute_tools/ @neondatabase/compute
|
||||
|
||||
# Proxy
|
||||
/compute_tools/ @neondatabase/control-plane @neondatabase/compute
|
||||
/libs/pageserver_api/ @neondatabase/storage
|
||||
/libs/postgres_ffi/ @neondatabase/compute @neondatabase/storage
|
||||
/libs/proxy/ @neondatabase/proxy
|
||||
/proxy/ @neondatabase/proxy
|
||||
|
||||
# Storage
|
||||
/libs/remote_storage/ @neondatabase/storage
|
||||
/libs/safekeeper_api/ @neondatabase/storage
|
||||
/libs/vm_monitor/ @neondatabase/autoscaling
|
||||
/pageserver/ @neondatabase/storage
|
||||
/pgxn/ @neondatabase/compute
|
||||
/pgxn/neon/ @neondatabase/compute @neondatabase/storage
|
||||
/proxy/ @neondatabase/proxy
|
||||
/safekeeper/ @neondatabase/storage
|
||||
/storage_controller @neondatabase/storage
|
||||
/storage_scrubber @neondatabase/storage
|
||||
/libs/pageserver_api/ @neondatabase/storage
|
||||
/libs/remote_storage/ @neondatabase/storage
|
||||
/libs/safekeeper_api/ @neondatabase/storage
|
||||
|
||||
# Shared
|
||||
/pgxn/neon/ @neondatabase/compute @neondatabase/storage
|
||||
/libs/compute_api/ @neondatabase/compute @neondatabase/control-plane
|
||||
/libs/postgres_ffi/ @neondatabase/compute @neondatabase/storage
|
||||
/vendor/ @neondatabase/compute
|
||||
|
||||
519
Cargo.lock
generated
519
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
10
Cargo.toml
10
Cargo.toml
@@ -51,6 +51,10 @@ anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
arc-swap = "1.6"
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
|
||||
atomic-take = "1.1.0"
|
||||
azure_core = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] }
|
||||
azure_identity = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
azure_storage = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
azure_storage_blobs = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
flate2 = "1.0.26"
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
@@ -212,12 +216,6 @@ postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git",
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||
|
||||
## Azure SDK crates
|
||||
azure_core = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "neon", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] }
|
||||
azure_identity = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "neon", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
azure_storage = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "neon", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
azure_storage_blobs = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "neon", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
|
||||
## Local libraries
|
||||
compute_api = { version = "0.1", path = "./libs/compute_api/" }
|
||||
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
|
||||
|
||||
@@ -115,7 +115,7 @@ RUN set -e \
|
||||
|
||||
# Keep the version the same as in compute/compute-node.Dockerfile and
|
||||
# test_runner/regress/test_compute_metrics.py.
|
||||
ENV SQL_EXPORTER_VERSION=0.16.0
|
||||
ENV SQL_EXPORTER_VERSION=0.13.1
|
||||
RUN curl -fsSL \
|
||||
"https://github.com/burningalchemist/sql_exporter/releases/download/${SQL_EXPORTER_VERSION}/sql_exporter-${SQL_EXPORTER_VERSION}.linux-$(case "$(uname -m)" in x86_64) echo amd64;; aarch64) echo arm64;; esac).tar.gz" \
|
||||
--output sql_exporter.tar.gz \
|
||||
|
||||
@@ -1324,7 +1324,7 @@ FROM quay.io/prometheuscommunity/postgres-exporter:v0.12.1 AS postgres-exporter
|
||||
|
||||
# Keep the version the same as in build-tools.Dockerfile and
|
||||
# test_runner/regress/test_compute_metrics.py.
|
||||
FROM burningalchemist/sql_exporter:0.16.0 AS sql-exporter
|
||||
FROM burningalchemist/sql_exporter:0.13.1 AS sql-exporter
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
import 'sql_exporter/compute_backpressure_throttling_seconds.libsonnet',
|
||||
import 'sql_exporter/compute_current_lsn.libsonnet',
|
||||
import 'sql_exporter/compute_logical_snapshot_files.libsonnet',
|
||||
import 'sql_exporter/compute_logical_snapshots_bytes.libsonnet',
|
||||
import 'sql_exporter/compute_max_connections.libsonnet',
|
||||
import 'sql_exporter/compute_receive_lsn.libsonnet',
|
||||
import 'sql_exporter/compute_subscriptions_count.libsonnet',
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
SELECT
|
||||
(SELECT current_setting('neon.timeline_id')) AS timeline_id,
|
||||
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
|
||||
-- These temporary snapshot files are renamed to the actual snapshot files
|
||||
-- after they are completely built. We only WAL-log the completely built
|
||||
-- snapshot files
|
||||
(SELECT COALESCE(sum(size), 0) FROM pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS logical_snapshots_bytes;
|
||||
@@ -1,17 +0,0 @@
|
||||
local neon = import 'neon.libsonnet';
|
||||
|
||||
local pg_ls_logicalsnapdir = importstr 'sql_exporter/compute_logical_snapshots_bytes.15.sql';
|
||||
local pg_ls_dir = importstr 'sql_exporter/compute_logical_snapshots_bytes.sql';
|
||||
|
||||
{
|
||||
metric_name: 'compute_logical_snapshots_bytes',
|
||||
type: 'gauge',
|
||||
help: 'Size of the pg_logical/snapshots directory, not including temporary files',
|
||||
key_labels: [
|
||||
'timeline_id',
|
||||
],
|
||||
values: [
|
||||
'logical_snapshots_bytes',
|
||||
],
|
||||
query: if neon.PG_MAJORVERSION_NUM < 15 then pg_ls_dir else pg_ls_logicalsnapdir,
|
||||
}
|
||||
@@ -1,9 +0,0 @@
|
||||
SELECT
|
||||
(SELECT setting FROM pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
|
||||
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
|
||||
-- These temporary snapshot files are renamed to the actual snapshot files
|
||||
-- after they are completely built. We only WAL-log the completely built
|
||||
-- snapshot files
|
||||
(SELECT COALESCE(sum((pg_stat_file('pg_logical/snapshots/' || name, missing_ok => true)).size), 0)
|
||||
FROM (SELECT * FROM pg_ls_dir('pg_logical/snapshots') WHERE pg_ls_dir LIKE '%.snap') AS name
|
||||
) AS logical_snapshots_bytes;
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1243,7 +1243,12 @@ impl ComputeNode {
|
||||
let postgresql_conf_path = pgdata_path.join("postgresql.conf");
|
||||
config::write_postgres_conf(&postgresql_conf_path, &spec, self.http_port)?;
|
||||
|
||||
let max_concurrent_connections = spec.reconfigure_concurrency;
|
||||
// TODO(ololobus): We need a concurrency during reconfiguration as well,
|
||||
// but DB is already running and used by user. We can easily get out of
|
||||
// `max_connections` limit, and the current code won't handle that.
|
||||
// let compute_state = self.state.lock().unwrap().clone();
|
||||
// let max_concurrent_connections = self.max_service_connections(&compute_state, &spec);
|
||||
let max_concurrent_connections = 1;
|
||||
|
||||
// Temporarily reset max_cluster_size in config
|
||||
// to avoid the possibility of hitting the limit, while we are reconfiguring:
|
||||
|
||||
@@ -53,7 +53,6 @@ use compute_api::spec::Role;
|
||||
use nix::sys::signal::kill;
|
||||
use nix::sys::signal::Signal;
|
||||
use pageserver_api::shard::ShardStripeSize;
|
||||
use reqwest::header::CONTENT_TYPE;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use url::Host;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
@@ -619,7 +618,6 @@ impl Endpoint {
|
||||
pgbouncer_settings: None,
|
||||
shard_stripe_size: Some(shard_stripe_size),
|
||||
local_proxy_config: None,
|
||||
reconfigure_concurrency: 1,
|
||||
};
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
||||
@@ -819,7 +817,6 @@ impl Endpoint {
|
||||
self.http_address.ip(),
|
||||
self.http_address.port()
|
||||
))
|
||||
.header(CONTENT_TYPE.as_str(), "application/json")
|
||||
.body(format!(
|
||||
"{{\"spec\":{}}}",
|
||||
serde_json::to_string_pretty(&spec)?
|
||||
|
||||
@@ -42,7 +42,6 @@ allow = [
|
||||
"MPL-2.0",
|
||||
"OpenSSL",
|
||||
"Unicode-DFS-2016",
|
||||
"Unicode-3.0",
|
||||
]
|
||||
confidence-threshold = 0.8
|
||||
exceptions = [
|
||||
|
||||
@@ -19,10 +19,6 @@ pub type PgIdent = String;
|
||||
/// String type alias representing Postgres extension version
|
||||
pub type ExtVersion = String;
|
||||
|
||||
fn default_reconfigure_concurrency() -> usize {
|
||||
1
|
||||
}
|
||||
|
||||
/// Cluster spec or configuration represented as an optional number of
|
||||
/// delta operations + final cluster state description.
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
|
||||
@@ -71,7 +67,7 @@ pub struct ComputeSpec {
|
||||
pub cluster: Cluster,
|
||||
pub delta_operations: Option<Vec<DeltaOp>>,
|
||||
|
||||
/// An optional hint that can be passed to speed up startup time if we know
|
||||
/// An optinal hint that can be passed to speed up startup time if we know
|
||||
/// that no pg catalog mutations (like role creation, database creation,
|
||||
/// extension creation) need to be done on the actual database to start.
|
||||
#[serde(default)] // Default false
|
||||
@@ -90,7 +86,9 @@ pub struct ComputeSpec {
|
||||
// etc. GUCs in cluster.settings. TODO: Once the control plane has been
|
||||
// updated to fill these fields, we can make these non optional.
|
||||
pub tenant_id: Option<TenantId>,
|
||||
|
||||
pub timeline_id: Option<TimelineId>,
|
||||
|
||||
pub pageserver_connstring: Option<String>,
|
||||
|
||||
#[serde(default)]
|
||||
@@ -115,20 +113,6 @@ pub struct ComputeSpec {
|
||||
/// Local Proxy configuration used for JWT authentication
|
||||
#[serde(default)]
|
||||
pub local_proxy_config: Option<LocalProxySpec>,
|
||||
|
||||
/// Number of concurrent connections during the parallel RunInEachDatabase
|
||||
/// phase of the apply config process.
|
||||
///
|
||||
/// We need a higher concurrency during reconfiguration in case of many DBs,
|
||||
/// but instance is already running and used by client. We can easily get out of
|
||||
/// `max_connections` limit, and the current code won't handle that.
|
||||
///
|
||||
/// Default is 1, but also allow control plane to override this value for specific
|
||||
/// projects. It's also recommended to bump `superuser_reserved_connections` +=
|
||||
/// `reconfigure_concurrency` for such projects to ensure that we always have
|
||||
/// enough spare connections for reconfiguration process to succeed.
|
||||
#[serde(default = "default_reconfigure_concurrency")]
|
||||
pub reconfigure_concurrency: usize,
|
||||
}
|
||||
|
||||
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
|
||||
@@ -331,9 +315,6 @@ mod tests {
|
||||
|
||||
// Features list defaults to empty vector.
|
||||
assert!(spec.features.is_empty());
|
||||
|
||||
// Reconfigure concurrency defaults to 1.
|
||||
assert_eq!(spec.reconfigure_concurrency, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -245,17 +245,6 @@ impl From<NodeAvailability> for NodeAvailabilityWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
/// Scheduling policy enables us to selectively disable some automatic actions that the
|
||||
/// controller performs on a tenant shard. This is only set to a non-default value by
|
||||
/// human intervention, and it is reset to the default value (Active) when the tenant's
|
||||
/// placement policy is modified away from Attached.
|
||||
///
|
||||
/// The typical use of a non-Active scheduling policy is one of:
|
||||
/// - Pinnning a shard to a node (i.e. migrating it there & setting a non-Active scheduling policy)
|
||||
/// - Working around a bug (e.g. if something is flapping and we need to stop it until the bug is fixed)
|
||||
///
|
||||
/// If you're not sure which policy to use to pin a shard to its current location, you probably
|
||||
/// want Pause.
|
||||
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
|
||||
pub enum ShardSchedulingPolicy {
|
||||
// Normal mode: the tenant's scheduled locations may be updated at will, including
|
||||
|
||||
@@ -158,8 +158,7 @@ impl ShardIdentity {
|
||||
key_to_shard_number(self.count, self.stripe_size, key)
|
||||
}
|
||||
|
||||
/// Return true if the key is stored only on this shard. This does not include
|
||||
/// global keys, see is_key_global().
|
||||
/// Return true if the key should be ingested by this shard
|
||||
///
|
||||
/// Shards must ingest _at least_ keys which return true from this check.
|
||||
pub fn is_key_local(&self, key: &Key) -> bool {
|
||||
@@ -172,7 +171,7 @@ impl ShardIdentity {
|
||||
}
|
||||
|
||||
/// Return true if the key should be stored on all shards, not just one.
|
||||
pub fn is_key_global(&self, key: &Key) -> bool {
|
||||
fn is_key_global(&self, key: &Key) -> bool {
|
||||
if key.is_slru_block_key() || key.is_slru_segment_size_key() || key.is_aux_file_key() {
|
||||
// Special keys that are only stored on shard 0
|
||||
false
|
||||
|
||||
@@ -8,14 +8,15 @@ use std::io;
|
||||
use std::num::NonZeroU32;
|
||||
use std::pin::Pin;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
|
||||
use azure_core::{Continuable, RetryOptions};
|
||||
use azure_identity::DefaultAzureCredential;
|
||||
use azure_storage::StorageCredentials;
|
||||
use azure_storage_blobs::blob::CopyStatus;
|
||||
use azure_storage_blobs::prelude::ClientBuilder;
|
||||
@@ -75,9 +76,8 @@ impl AzureBlobStorage {
|
||||
let credentials = if let Ok(access_key) = env::var("AZURE_STORAGE_ACCESS_KEY") {
|
||||
StorageCredentials::access_key(account.clone(), access_key)
|
||||
} else {
|
||||
let token_credential = azure_identity::create_default_credential()
|
||||
.context("trying to obtain Azure default credentials")?;
|
||||
StorageCredentials::token_credential(token_credential)
|
||||
let token_credential = DefaultAzureCredential::default();
|
||||
StorageCredentials::token_credential(Arc::new(token_credential))
|
||||
};
|
||||
|
||||
// we have an outer retry
|
||||
@@ -624,10 +624,6 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
res
|
||||
}
|
||||
|
||||
fn max_keys_per_delete(&self) -> usize {
|
||||
super::MAX_KEYS_PER_DELETE_AZURE
|
||||
}
|
||||
|
||||
async fn copy(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
|
||||
@@ -70,14 +70,7 @@ pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
|
||||
pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
|
||||
|
||||
/// As defined in S3 docs
|
||||
///
|
||||
/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
|
||||
pub const MAX_KEYS_PER_DELETE_S3: usize = 1000;
|
||||
|
||||
/// As defined in Azure docs
|
||||
///
|
||||
/// <https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch>
|
||||
pub const MAX_KEYS_PER_DELETE_AZURE: usize = 256;
|
||||
pub const MAX_KEYS_PER_DELETE: usize = 1000;
|
||||
|
||||
const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
|
||||
|
||||
@@ -347,14 +340,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()>;
|
||||
|
||||
/// Returns the maximum number of keys that a call to [`Self::delete_objects`] can delete without chunking
|
||||
///
|
||||
/// The value returned is only an optimization hint, One can pass larger number of objects to
|
||||
/// `delete_objects` as well.
|
||||
///
|
||||
/// The value is guaranteed to be >= 1.
|
||||
fn max_keys_per_delete(&self) -> usize;
|
||||
|
||||
/// Deletes all objects matching the given prefix.
|
||||
///
|
||||
/// NB: this uses NoDelimiter and will match partial prefixes. For example, the prefix /a/b will
|
||||
@@ -548,16 +533,6 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
}
|
||||
}
|
||||
|
||||
/// [`RemoteStorage::max_keys_per_delete`]
|
||||
pub fn max_keys_per_delete(&self) -> usize {
|
||||
match self {
|
||||
Self::LocalFs(s) => s.max_keys_per_delete(),
|
||||
Self::AwsS3(s) => s.max_keys_per_delete(),
|
||||
Self::AzureBlob(s) => s.max_keys_per_delete(),
|
||||
Self::Unreliable(s) => s.max_keys_per_delete(),
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`RemoteStorage::delete_prefix`]
|
||||
pub async fn delete_prefix(
|
||||
&self,
|
||||
|
||||
@@ -573,10 +573,6 @@ impl RemoteStorage for LocalFs {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn max_keys_per_delete(&self) -> usize {
|
||||
super::MAX_KEYS_PER_DELETE_S3
|
||||
}
|
||||
|
||||
async fn copy(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
|
||||
@@ -48,7 +48,7 @@ use crate::{
|
||||
metrics::{start_counting_cancelled_wait, start_measuring_requests},
|
||||
support::PermitCarrying,
|
||||
ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
|
||||
RemotePath, RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE_S3,
|
||||
RemotePath, RemoteStorage, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE,
|
||||
REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
};
|
||||
|
||||
@@ -355,7 +355,7 @@ impl S3Bucket {
|
||||
let kind = RequestKind::Delete;
|
||||
let mut cancel = std::pin::pin!(cancel.cancelled());
|
||||
|
||||
for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE_S3) {
|
||||
for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
|
||||
let started_at = start_measuring_requests(kind);
|
||||
|
||||
let req = self
|
||||
@@ -832,10 +832,6 @@ impl RemoteStorage for S3Bucket {
|
||||
self.delete_oids(&permit, &delete_objects, cancel).await
|
||||
}
|
||||
|
||||
fn max_keys_per_delete(&self) -> usize {
|
||||
MAX_KEYS_PER_DELETE_S3
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
|
||||
let paths = std::array::from_ref(path);
|
||||
self.delete_objects(paths, cancel).await
|
||||
|
||||
@@ -203,10 +203,6 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn max_keys_per_delete(&self) -> usize {
|
||||
self.inner.max_keys_per_delete()
|
||||
}
|
||||
|
||||
async fn copy(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
|
||||
@@ -164,12 +164,6 @@ impl TenantShardId {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ShardNumber {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.0.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ShardSlug<'_> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use remote_storage::RemotePath;
|
||||
use remote_storage::TimeoutOrCancel;
|
||||
use remote_storage::MAX_KEYS_PER_DELETE;
|
||||
use std::time::Duration;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
@@ -130,8 +131,7 @@ impl Deleter {
|
||||
}
|
||||
|
||||
pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
|
||||
let max_keys_per_delete = self.remote_storage.max_keys_per_delete();
|
||||
self.accumulator.reserve(max_keys_per_delete);
|
||||
self.accumulator.reserve(MAX_KEYS_PER_DELETE);
|
||||
|
||||
loop {
|
||||
if self.cancel.is_cancelled() {
|
||||
@@ -156,14 +156,14 @@ impl Deleter {
|
||||
|
||||
match msg {
|
||||
DeleterMessage::Delete(mut list) => {
|
||||
while !list.is_empty() || self.accumulator.len() == max_keys_per_delete {
|
||||
if self.accumulator.len() == max_keys_per_delete {
|
||||
while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
|
||||
if self.accumulator.len() == MAX_KEYS_PER_DELETE {
|
||||
self.flush().await?;
|
||||
// If we have received this number of keys, proceed with attempting to execute
|
||||
assert_eq!(self.accumulator.len(), 0);
|
||||
}
|
||||
|
||||
let available_slots = max_keys_per_delete - self.accumulator.len();
|
||||
let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
|
||||
let take_count = std::cmp::min(available_slots, list.len());
|
||||
for path in list.drain(list.len() - take_count..) {
|
||||
self.accumulator.push(path);
|
||||
|
||||
@@ -87,7 +87,7 @@ use crate::tenant::timeline::offload::offload_timeline;
|
||||
use crate::tenant::timeline::offload::OffloadError;
|
||||
use crate::tenant::timeline::CompactFlags;
|
||||
use crate::tenant::timeline::CompactOptions;
|
||||
use crate::tenant::timeline::CompactRequest;
|
||||
use crate::tenant::timeline::CompactRange;
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::timeline::Timeline;
|
||||
use crate::tenant::GetTimelineError;
|
||||
@@ -1978,26 +1978,6 @@ async fn timeline_gc_handler(
|
||||
json_response(StatusCode::OK, gc_result)
|
||||
}
|
||||
|
||||
// Cancel scheduled compaction tasks
|
||||
async fn timeline_cancel_compact_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
let state = get_state(&request);
|
||||
async {
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
tenant.cancel_scheduled_compaction(timeline_id);
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
.instrument(info_span!("timeline_cancel_compact", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
|
||||
.await
|
||||
}
|
||||
|
||||
// Run compaction immediately on given timeline.
|
||||
async fn timeline_compact_handler(
|
||||
mut request: Request<Body>,
|
||||
@@ -2007,7 +1987,7 @@ async fn timeline_compact_handler(
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let compact_request = json_request_maybe::<Option<CompactRequest>>(&mut request).await?;
|
||||
let compact_range = json_request_maybe::<Option<CompactRange>>(&mut request).await?;
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
@@ -2032,50 +2012,22 @@ async fn timeline_compact_handler(
|
||||
let wait_until_uploaded =
|
||||
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
|
||||
|
||||
let wait_until_scheduled_compaction_done =
|
||||
parse_query_param::<_, bool>(&request, "wait_until_scheduled_compaction_done")?
|
||||
.unwrap_or(false);
|
||||
|
||||
let sub_compaction = compact_request
|
||||
.as_ref()
|
||||
.map(|r| r.sub_compaction)
|
||||
.unwrap_or(false);
|
||||
let options = CompactOptions {
|
||||
compact_range: compact_request
|
||||
.as_ref()
|
||||
.and_then(|r| r.compact_range.clone()),
|
||||
compact_below_lsn: compact_request.as_ref().and_then(|r| r.compact_below_lsn),
|
||||
compact_range,
|
||||
flags,
|
||||
sub_compaction,
|
||||
};
|
||||
|
||||
let scheduled = compact_request
|
||||
.as_ref()
|
||||
.map(|r| r.scheduled)
|
||||
.unwrap_or(false);
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
|
||||
if scheduled {
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
let rx = tenant.schedule_compaction(timeline_id, options).await.map_err(ApiError::InternalServerError)?;
|
||||
if wait_until_scheduled_compaction_done {
|
||||
// It is possible that this will take a long time, dropping the HTTP request will not cancel the compaction.
|
||||
rx.await.ok();
|
||||
}
|
||||
} else {
|
||||
timeline
|
||||
.compact_with_options(&cancel, options, &ctx)
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
if wait_until_uploaded {
|
||||
timeline.remote_client.wait_completion().await
|
||||
// XXX map to correct ApiError for the cases where it's due to shutdown
|
||||
.context("wait completion").map_err(ApiError::InternalServerError)?;
|
||||
}
|
||||
timeline
|
||||
.compact_with_options(&cancel, options, &ctx)
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
if wait_until_uploaded {
|
||||
timeline.remote_client.wait_completion().await
|
||||
// XXX map to correct ApiError for the cases where it's due to shutdown
|
||||
.context("wait completion").map_err(ApiError::InternalServerError)?;
|
||||
}
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
@@ -2156,20 +2108,16 @@ async fn timeline_checkpoint_handler(
|
||||
// By default, checkpoints come with a compaction, but this may be optionally disabled by tests that just want to flush + upload.
|
||||
let compact = parse_query_param::<_, bool>(&request, "compact")?.unwrap_or(true);
|
||||
|
||||
let wait_until_flushed: bool =
|
||||
parse_query_param(&request, "wait_until_flushed")?.unwrap_or(true);
|
||||
|
||||
let wait_until_uploaded =
|
||||
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
|
||||
if wait_until_flushed {
|
||||
timeline.freeze_and_flush().await
|
||||
} else {
|
||||
timeline.freeze().await.and(Ok(()))
|
||||
}.map_err(|e| {
|
||||
timeline
|
||||
.freeze_and_flush()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
match e {
|
||||
tenant::timeline::FlushLayerError::Cancelled => ApiError::ShuttingDown,
|
||||
other => ApiError::InternalServerError(other.into()),
|
||||
@@ -3353,10 +3301,6 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|
||||
|r| api_handler(r, timeline_compact_handler),
|
||||
)
|
||||
.delete(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|
||||
|r| api_handler(r, timeline_cancel_compact_handler),
|
||||
)
|
||||
.put(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/offload",
|
||||
|r| testing_api_handler("attempt timeline offload", r, timeline_offload_handler),
|
||||
|
||||
@@ -464,24 +464,6 @@ static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static DISK_CONSISTENT_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_disk_consistent_lsn",
|
||||
"Disk consistent LSN grouped by timeline",
|
||||
&["tenant_id", "shard_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static PROJECTED_REMOTE_CONSISTENT_LSN: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_projected_remote_consistent_lsn",
|
||||
"Projected remote consistent LSN grouped by timeline",
|
||||
&["tenant_id", "shard_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static PITR_HISTORY_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_pitr_history_size",
|
||||
@@ -1223,60 +1205,31 @@ pub(crate) mod virtual_file_io_engine {
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) struct SmgrOpTimer(Option<SmgrOpTimerInner>);
|
||||
pub(crate) struct SmgrOpTimerInner {
|
||||
pub(crate) struct SmgrOpTimer {
|
||||
global_latency_histo: Histogram,
|
||||
|
||||
// Optional because not all op types are tracked per-timeline
|
||||
per_timeline_latency_histo: Option<Histogram>,
|
||||
|
||||
global_flush_in_progress_micros: IntCounter,
|
||||
per_timeline_flush_in_progress_micros: IntCounter,
|
||||
|
||||
start: Instant,
|
||||
throttled: Duration,
|
||||
op: SmgrQueryType,
|
||||
}
|
||||
|
||||
pub(crate) struct SmgrOpFlushInProgress {
|
||||
base: Instant,
|
||||
global_micros: IntCounter,
|
||||
per_timeline_micros: IntCounter,
|
||||
}
|
||||
|
||||
impl SmgrOpTimer {
|
||||
pub(crate) fn deduct_throttle(&mut self, throttle: &Option<Duration>) {
|
||||
let Some(throttle) = throttle else {
|
||||
return;
|
||||
};
|
||||
let inner = self.0.as_mut().expect("other public methods consume self");
|
||||
inner.throttled += *throttle;
|
||||
self.throttled += *throttle;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> SmgrOpFlushInProgress {
|
||||
let (flush_start, inner) = self
|
||||
.smgr_op_end()
|
||||
.expect("this method consume self, and the only other caller is drop handler");
|
||||
let SmgrOpTimerInner {
|
||||
global_flush_in_progress_micros,
|
||||
per_timeline_flush_in_progress_micros,
|
||||
..
|
||||
} = inner;
|
||||
SmgrOpFlushInProgress {
|
||||
base: flush_start,
|
||||
global_micros: global_flush_in_progress_micros,
|
||||
per_timeline_micros: per_timeline_flush_in_progress_micros,
|
||||
}
|
||||
}
|
||||
impl Drop for SmgrOpTimer {
|
||||
fn drop(&mut self) {
|
||||
let elapsed = self.start.elapsed();
|
||||
|
||||
/// Returns `None`` if this method has already been called, `Some` otherwise.
|
||||
fn smgr_op_end(&mut self) -> Option<(Instant, SmgrOpTimerInner)> {
|
||||
let inner = self.0.take()?;
|
||||
|
||||
let now = Instant::now();
|
||||
let elapsed = now - inner.start;
|
||||
|
||||
let elapsed = match elapsed.checked_sub(inner.throttled) {
|
||||
let elapsed = match elapsed.checked_sub(self.throttled) {
|
||||
Some(elapsed) => elapsed,
|
||||
None => {
|
||||
use utils::rate_limit::RateLimit;
|
||||
@@ -1287,9 +1240,9 @@ impl SmgrOpTimer {
|
||||
})))
|
||||
});
|
||||
let mut guard = LOGGED.lock().unwrap();
|
||||
let rate_limit = &mut guard[inner.op];
|
||||
let rate_limit = &mut guard[self.op];
|
||||
rate_limit.call(|| {
|
||||
warn!(op=?inner.op, ?elapsed, ?inner.throttled, "implementation error: time spent throttled exceeds total request wall clock time");
|
||||
warn!(op=?self.op, ?elapsed, ?self.throttled, "implementation error: time spent throttled exceeds total request wall clock time");
|
||||
});
|
||||
elapsed // un-throttled time, more info than just saturating to 0
|
||||
}
|
||||
@@ -1297,54 +1250,10 @@ impl SmgrOpTimer {
|
||||
|
||||
let elapsed = elapsed.as_secs_f64();
|
||||
|
||||
inner.global_latency_histo.observe(elapsed);
|
||||
if let Some(per_timeline_getpage_histo) = &inner.per_timeline_latency_histo {
|
||||
self.global_latency_histo.observe(elapsed);
|
||||
if let Some(per_timeline_getpage_histo) = &self.per_timeline_latency_histo {
|
||||
per_timeline_getpage_histo.observe(elapsed);
|
||||
}
|
||||
|
||||
Some((now, inner))
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SmgrOpTimer {
|
||||
fn drop(&mut self) {
|
||||
self.smgr_op_end();
|
||||
}
|
||||
}
|
||||
|
||||
impl SmgrOpFlushInProgress {
|
||||
pub(crate) async fn measure<Fut, O>(mut self, mut fut: Fut) -> O
|
||||
where
|
||||
Fut: std::future::Future<Output = O>,
|
||||
{
|
||||
let mut fut = std::pin::pin!(fut);
|
||||
|
||||
let now = Instant::now();
|
||||
// Whenever observe_guard gets called, or dropped,
|
||||
// it adds the time elapsed since its last call to metrics.
|
||||
// Last call is tracked in `now`.
|
||||
let mut observe_guard = scopeguard::guard(
|
||||
|| {
|
||||
let elapsed = now - self.base;
|
||||
self.global_micros
|
||||
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
|
||||
self.per_timeline_micros
|
||||
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
|
||||
self.base = now;
|
||||
},
|
||||
|mut observe| {
|
||||
observe();
|
||||
},
|
||||
);
|
||||
|
||||
loop {
|
||||
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
|
||||
Ok(v) => return v,
|
||||
Err(_timeout) => {
|
||||
(*observe_guard)();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1375,8 +1284,6 @@ pub(crate) struct SmgrQueryTimePerTimeline {
|
||||
per_timeline_getpage_latency: Histogram,
|
||||
global_batch_size: Histogram,
|
||||
per_timeline_batch_size: Histogram,
|
||||
global_flush_in_progress_micros: IntCounter,
|
||||
per_timeline_flush_in_progress_micros: IntCounter,
|
||||
}
|
||||
|
||||
static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
@@ -1539,26 +1446,6 @@ fn set_page_service_config_max_batch_size(conf: &PageServicePipeliningConfig) {
|
||||
.set(value.try_into().unwrap());
|
||||
}
|
||||
|
||||
static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_page_service_pagestream_flush_in_progress_micros",
|
||||
"Counter that sums up the microseconds that a pagestream response was being flushed into the TCP connection. \
|
||||
If the flush is particularly slow, this counter will be updated periodically to make slow flushes \
|
||||
easily discoverable in monitoring. \
|
||||
Hence, this is NOT a completion latency historgram.",
|
||||
&["tenant_id", "shard_id", "timeline_id"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_page_service_pagestream_flush_in_progress_micros_global",
|
||||
"Like pageserver_page_service_pagestream_flush_in_progress_seconds, but instance-wide.",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
impl SmgrQueryTimePerTimeline {
|
||||
pub(crate) fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self {
|
||||
let tenant_id = tenant_shard_id.tenant_id.to_string();
|
||||
@@ -1599,12 +1486,6 @@ impl SmgrQueryTimePerTimeline {
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
|
||||
.unwrap();
|
||||
|
||||
let global_flush_in_progress_micros =
|
||||
PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL.clone();
|
||||
let per_timeline_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
|
||||
.unwrap();
|
||||
|
||||
Self {
|
||||
global_started,
|
||||
global_latency,
|
||||
@@ -1612,8 +1493,6 @@ impl SmgrQueryTimePerTimeline {
|
||||
per_timeline_getpage_started,
|
||||
global_batch_size,
|
||||
per_timeline_batch_size,
|
||||
global_flush_in_progress_micros,
|
||||
per_timeline_flush_in_progress_micros,
|
||||
}
|
||||
}
|
||||
pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, started_at: Instant) -> SmgrOpTimer {
|
||||
@@ -1626,17 +1505,13 @@ impl SmgrQueryTimePerTimeline {
|
||||
None
|
||||
};
|
||||
|
||||
SmgrOpTimer(Some(SmgrOpTimerInner {
|
||||
SmgrOpTimer {
|
||||
global_latency_histo: self.global_latency[op as usize].clone(),
|
||||
per_timeline_latency_histo,
|
||||
start: started_at,
|
||||
op,
|
||||
throttled: Duration::ZERO,
|
||||
global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(),
|
||||
per_timeline_flush_in_progress_micros: self
|
||||
.per_timeline_flush_in_progress_micros
|
||||
.clone(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn observe_getpage_batch_start(&self, batch_size: usize) {
|
||||
@@ -2311,15 +2186,6 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMet
|
||||
.expect("failed to define a metric"),
|
||||
});
|
||||
|
||||
pub(crate) static PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_timeline_wal_records_received",
|
||||
"Number of WAL records received per shard",
|
||||
&["tenant_id", "shard_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static WAL_REDO_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_wal_redo_seconds",
|
||||
@@ -2528,8 +2394,7 @@ pub(crate) struct TimelineMetrics {
|
||||
pub load_layer_map_histo: StorageTimeMetrics,
|
||||
pub garbage_collect_histo: StorageTimeMetrics,
|
||||
pub find_gc_cutoffs_histo: StorageTimeMetrics,
|
||||
pub last_record_lsn_gauge: IntGauge,
|
||||
pub disk_consistent_lsn_gauge: IntGauge,
|
||||
pub last_record_gauge: IntGauge,
|
||||
pub pitr_history_size: UIntGauge,
|
||||
pub archival_size: UIntGauge,
|
||||
pub(crate) layer_size_image: UIntGauge,
|
||||
@@ -2547,7 +2412,6 @@ pub(crate) struct TimelineMetrics {
|
||||
pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>,
|
||||
/// Number of valid LSN leases.
|
||||
pub valid_lsn_lease_count_gauge: UIntGauge,
|
||||
pub wal_records_received: IntCounter,
|
||||
shutdown: std::sync::atomic::AtomicBool,
|
||||
}
|
||||
|
||||
@@ -2611,11 +2475,7 @@ impl TimelineMetrics {
|
||||
&shard_id,
|
||||
&timeline_id,
|
||||
);
|
||||
let last_record_lsn_gauge = LAST_RECORD_LSN
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
|
||||
let disk_consistent_lsn_gauge = DISK_CONSISTENT_LSN
|
||||
let last_record_gauge = LAST_RECORD_LSN
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
|
||||
@@ -2705,10 +2565,6 @@ impl TimelineMetrics {
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
|
||||
let wal_records_received = PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
|
||||
TimelineMetrics {
|
||||
tenant_id,
|
||||
shard_id,
|
||||
@@ -2722,8 +2578,7 @@ impl TimelineMetrics {
|
||||
garbage_collect_histo,
|
||||
find_gc_cutoffs_histo,
|
||||
load_layer_map_histo,
|
||||
last_record_lsn_gauge,
|
||||
disk_consistent_lsn_gauge,
|
||||
last_record_gauge,
|
||||
pitr_history_size,
|
||||
archival_size,
|
||||
layer_size_image,
|
||||
@@ -2741,7 +2596,6 @@ impl TimelineMetrics {
|
||||
evictions_with_low_residence_duration,
|
||||
),
|
||||
valid_lsn_lease_count_gauge,
|
||||
wal_records_received,
|
||||
shutdown: std::sync::atomic::AtomicBool::default(),
|
||||
}
|
||||
}
|
||||
@@ -2788,7 +2642,6 @@ impl TimelineMetrics {
|
||||
let timeline_id = &self.timeline_id;
|
||||
let shard_id = &self.shard_id;
|
||||
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
let _ = DISK_CONSISTENT_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
let _ = FLUSH_WAIT_UPLOAD_TIME.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
let _ = STANDBY_HORIZON.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
{
|
||||
@@ -2879,16 +2732,6 @@ impl TimelineMetrics {
|
||||
shard_id,
|
||||
timeline_id,
|
||||
]);
|
||||
let _ = PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED.remove_label_values(&[
|
||||
tenant_id,
|
||||
shard_id,
|
||||
timeline_id,
|
||||
]);
|
||||
let _ = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS.remove_label_values(&[
|
||||
tenant_id,
|
||||
shard_id,
|
||||
timeline_id,
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2962,7 +2805,6 @@ pub(crate) struct RemoteTimelineClientMetrics {
|
||||
calls: Mutex<HashMap<(&'static str, &'static str), IntCounterPair>>,
|
||||
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
|
||||
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
|
||||
pub(crate) projected_remote_consistent_lsn_gauge: UIntGauge,
|
||||
}
|
||||
|
||||
impl RemoteTimelineClientMetrics {
|
||||
@@ -2977,10 +2819,6 @@ impl RemoteTimelineClientMetrics {
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let projected_remote_consistent_lsn_gauge = PROJECTED_REMOTE_CONSISTENT_LSN
|
||||
.get_metric_with_label_values(&[&tenant_id_str, &shard_id_str, &timeline_id_str])
|
||||
.unwrap();
|
||||
|
||||
RemoteTimelineClientMetrics {
|
||||
tenant_id: tenant_id_str,
|
||||
shard_id: shard_id_str,
|
||||
@@ -2989,7 +2827,6 @@ impl RemoteTimelineClientMetrics {
|
||||
bytes_started_counter: Mutex::new(HashMap::default()),
|
||||
bytes_finished_counter: Mutex::new(HashMap::default()),
|
||||
remote_physical_size_gauge,
|
||||
projected_remote_consistent_lsn_gauge,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3203,7 +3040,6 @@ impl Drop for RemoteTimelineClientMetrics {
|
||||
calls,
|
||||
bytes_started_counter,
|
||||
bytes_finished_counter,
|
||||
projected_remote_consistent_lsn_gauge,
|
||||
} = self;
|
||||
for ((a, b), _) in calls.get_mut().unwrap().drain() {
|
||||
let mut res = [Ok(()), Ok(())];
|
||||
@@ -3233,14 +3069,6 @@ impl Drop for RemoteTimelineClientMetrics {
|
||||
let _ = remote_physical_size_gauge; // use to avoid 'unused' warning in desctructuring above
|
||||
let _ = REMOTE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
}
|
||||
{
|
||||
let _ = projected_remote_consistent_lsn_gauge;
|
||||
let _ = PROJECTED_REMOTE_CONSISTENT_LSN.remove_label_values(&[
|
||||
tenant_id,
|
||||
shard_id,
|
||||
timeline_id,
|
||||
]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1017,8 +1017,10 @@ impl PageServerHandler {
|
||||
// Map handler result to protocol behavior.
|
||||
// Some handler errors cause exit from pagestream protocol.
|
||||
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
|
||||
let mut timers: smallvec::SmallVec<[_; 1]> =
|
||||
smallvec::SmallVec::with_capacity(handler_results.len());
|
||||
for handler_result in handler_results {
|
||||
let (response_msg, timer) = match handler_result {
|
||||
let response_msg = match handler_result {
|
||||
Err(e) => match &e {
|
||||
PageStreamError::Shutdown => {
|
||||
// If we fail to fulfil a request during shutdown, which may be _because_ of
|
||||
@@ -1042,66 +1044,34 @@ impl PageServerHandler {
|
||||
span.in_scope(|| {
|
||||
error!("error reading relation or page version: {full:#}")
|
||||
});
|
||||
(
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: e.to_string(),
|
||||
}),
|
||||
None, // TODO: measure errors
|
||||
)
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: e.to_string(),
|
||||
})
|
||||
}
|
||||
},
|
||||
Ok((response_msg, timer)) => (response_msg, Some(timer)),
|
||||
Ok((response_msg, timer)) => {
|
||||
// Extending the lifetime of the timers so observations on drop
|
||||
// include the flush time.
|
||||
timers.push(timer);
|
||||
response_msg
|
||||
}
|
||||
};
|
||||
|
||||
//
|
||||
// marshal & transmit response message
|
||||
//
|
||||
|
||||
pgb_writer.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
|
||||
|
||||
// We purposefully don't count flush time into the timer.
|
||||
//
|
||||
// The reason is that current compute client will not perform protocol processing
|
||||
// if the postgres backend process is doing things other than `->smgr_read()`.
|
||||
// This is especially the case for prefetch.
|
||||
//
|
||||
// If the compute doesn't read from the connection, eventually TCP will backpressure
|
||||
// all the way into our flush call below.
|
||||
//
|
||||
// The timer's underlying metric is used for a storage-internal latency SLO and
|
||||
// we don't want to include latency in it that we can't control.
|
||||
// And as pointed out above, in this case, we don't control the time that flush will take.
|
||||
let flushing_timer =
|
||||
timer.map(|timer| timer.observe_smgr_op_completion_and_start_flushing());
|
||||
|
||||
// what we want to do
|
||||
let flush_fut = pgb_writer.flush();
|
||||
// metric for how long flushing takes
|
||||
let flush_fut = match flushing_timer {
|
||||
Some(flushing_timer) => {
|
||||
futures::future::Either::Left(flushing_timer.measure(flush_fut))
|
||||
}
|
||||
None => futures::future::Either::Right(flush_fut),
|
||||
};
|
||||
// do it while respecting cancellation
|
||||
let _: () = async move {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = cancel.cancelled() => {
|
||||
// We were requested to shut down.
|
||||
info!("shutdown request received in page handler");
|
||||
return Err(QueryError::Shutdown)
|
||||
}
|
||||
res = flush_fut => {
|
||||
res?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
// and log the info! line inside the request span
|
||||
.instrument(span.clone())
|
||||
.await?;
|
||||
}
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = cancel.cancelled() => {
|
||||
// We were requested to shut down.
|
||||
info!("shutdown request received in page handler");
|
||||
return Err(QueryError::Shutdown)
|
||||
}
|
||||
res = pgb_writer.flush() => {
|
||||
res?;
|
||||
}
|
||||
}
|
||||
drop(timers);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -37,19 +37,14 @@ use remote_timeline_client::manifest::{
|
||||
};
|
||||
use remote_timeline_client::UploadQueueNotReadyError;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Weak;
|
||||
use std::time::SystemTime;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use timeline::compaction::ScheduledCompactionTask;
|
||||
use timeline::import_pgdata;
|
||||
use timeline::offload::offload_timeline;
|
||||
use timeline::CompactFlags;
|
||||
use timeline::CompactOptions;
|
||||
use timeline::CompactionError;
|
||||
use timeline::ShutdownMode;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::sync::watch;
|
||||
@@ -344,11 +339,6 @@ pub struct Tenant {
|
||||
/// Overhead of mutex is acceptable because compaction is done with a multi-second period.
|
||||
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,
|
||||
|
||||
/// Scheduled compaction tasks. Currently, this can only be populated by triggering
|
||||
/// a manual gc-compaction from the manual compaction API.
|
||||
scheduled_compaction_tasks:
|
||||
std::sync::Mutex<HashMap<TimelineId, VecDeque<ScheduledCompactionTask>>>,
|
||||
|
||||
/// If the tenant is in Activating state, notify this to encourage it
|
||||
/// to proceed to Active as soon as possible, rather than waiting for lazy
|
||||
/// background warmup.
|
||||
@@ -2963,109 +2953,27 @@ impl Tenant {
|
||||
|
||||
for (timeline_id, timeline, (can_compact, can_offload)) in &timelines_to_compact_or_offload
|
||||
{
|
||||
// pending_task_left == None: cannot compact, maybe still pending tasks
|
||||
// pending_task_left == Some(true): compaction task left
|
||||
// pending_task_left == Some(false): no compaction task left
|
||||
let pending_task_left = if *can_compact {
|
||||
let has_pending_l0_compaction_task = timeline
|
||||
.compact(cancel, EnumSet::empty(), ctx)
|
||||
.instrument(info_span!("compact_timeline", %timeline_id))
|
||||
.await
|
||||
.inspect_err(|e| match e {
|
||||
timeline::CompactionError::ShuttingDown => (),
|
||||
timeline::CompactionError::Offload(_) => {
|
||||
// Failures to offload timelines do not trip the circuit breaker, because
|
||||
// they do not do lots of writes the way compaction itself does: it is cheap
|
||||
// to retry, and it would be bad to stop all compaction because of an issue with offloading.
|
||||
}
|
||||
timeline::CompactionError::Other(e) => {
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
.unwrap()
|
||||
.fail(&CIRCUIT_BREAKERS_BROKEN, e);
|
||||
}
|
||||
})?;
|
||||
if has_pending_l0_compaction_task {
|
||||
Some(true)
|
||||
} else {
|
||||
let mut has_pending_scheduled_compaction_task;
|
||||
let next_scheduled_compaction_task = {
|
||||
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
if let Some(tline_pending_tasks) = guard.get_mut(timeline_id) {
|
||||
if !tline_pending_tasks.is_empty() {
|
||||
info!(
|
||||
"{} tasks left in the compaction schedule queue",
|
||||
tline_pending_tasks.len()
|
||||
);
|
||||
Some(
|
||||
timeline
|
||||
.compact(cancel, EnumSet::empty(), ctx)
|
||||
.instrument(info_span!("compact_timeline", %timeline_id))
|
||||
.await
|
||||
.inspect_err(|e| match e {
|
||||
timeline::CompactionError::ShuttingDown => (),
|
||||
timeline::CompactionError::Offload(_) => {
|
||||
// Failures to offload timelines do not trip the circuit breaker, because
|
||||
// they do not do lots of writes the way compaction itself does: it is cheap
|
||||
// to retry, and it would be bad to stop all compaction because of an issue with offloading.
|
||||
}
|
||||
let next_task = tline_pending_tasks.pop_front();
|
||||
has_pending_scheduled_compaction_task = !tline_pending_tasks.is_empty();
|
||||
next_task
|
||||
} else {
|
||||
has_pending_scheduled_compaction_task = false;
|
||||
None
|
||||
}
|
||||
};
|
||||
if let Some(mut next_scheduled_compaction_task) = next_scheduled_compaction_task
|
||||
{
|
||||
if !next_scheduled_compaction_task
|
||||
.options
|
||||
.flags
|
||||
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
|
||||
{
|
||||
warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options);
|
||||
} else if next_scheduled_compaction_task.options.sub_compaction {
|
||||
info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
|
||||
let jobs = timeline
|
||||
.gc_compaction_split_jobs(next_scheduled_compaction_task.options)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
if jobs.is_empty() {
|
||||
info!("no jobs to run, skipping scheduled compaction task");
|
||||
} else {
|
||||
has_pending_scheduled_compaction_task = true;
|
||||
let jobs_len = jobs.len();
|
||||
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
let tline_pending_tasks = guard.entry(*timeline_id).or_default();
|
||||
for (idx, job) in jobs.into_iter().enumerate() {
|
||||
tline_pending_tasks.push_back(if idx == jobs_len - 1 {
|
||||
ScheduledCompactionTask {
|
||||
options: job,
|
||||
// The last job in the queue sends the signal and releases the gc guard
|
||||
result_tx: next_scheduled_compaction_task
|
||||
.result_tx
|
||||
.take(),
|
||||
gc_block: next_scheduled_compaction_task
|
||||
.gc_block
|
||||
.take(),
|
||||
}
|
||||
} else {
|
||||
ScheduledCompactionTask {
|
||||
options: job,
|
||||
result_tx: None,
|
||||
gc_block: None,
|
||||
}
|
||||
});
|
||||
}
|
||||
info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
|
||||
timeline::CompactionError::Other(e) => {
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
.unwrap()
|
||||
.fail(&CIRCUIT_BREAKERS_BROKEN, e);
|
||||
}
|
||||
} else {
|
||||
let _ = timeline
|
||||
.compact_with_options(
|
||||
cancel,
|
||||
next_scheduled_compaction_task.options,
|
||||
ctx,
|
||||
)
|
||||
.instrument(info_span!("scheduled_compact_timeline", %timeline_id))
|
||||
.await?;
|
||||
if let Some(tx) = next_scheduled_compaction_task.result_tx.take() {
|
||||
// TODO: we can send compaction statistics in the future
|
||||
tx.send(()).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(has_pending_scheduled_compaction_task)
|
||||
}
|
||||
})?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -3085,43 +2993,6 @@ impl Tenant {
|
||||
Ok(has_pending_task)
|
||||
}
|
||||
|
||||
/// Cancel scheduled compaction tasks
|
||||
pub(crate) fn cancel_scheduled_compaction(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
) -> Vec<ScheduledCompactionTask> {
|
||||
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
if let Some(tline_pending_tasks) = guard.get_mut(&timeline_id) {
|
||||
let current_tline_pending_tasks = std::mem::take(tline_pending_tasks);
|
||||
current_tline_pending_tasks.into_iter().collect()
|
||||
} else {
|
||||
Vec::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// Schedule a compaction task for a timeline.
|
||||
pub(crate) async fn schedule_compaction(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
options: CompactOptions,
|
||||
) -> anyhow::Result<tokio::sync::oneshot::Receiver<()>> {
|
||||
let gc_guard = match self.gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
bail!("cannot run gc-compaction because gc is blocked: {}", e);
|
||||
}
|
||||
};
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
let tline_pending_tasks = guard.entry(timeline_id).or_default();
|
||||
tline_pending_tasks.push_back(ScheduledCompactionTask {
|
||||
options,
|
||||
result_tx: Some(tx),
|
||||
gc_block: Some(gc_guard),
|
||||
});
|
||||
Ok(rx)
|
||||
}
|
||||
|
||||
// Call through to all timelines to freeze ephemeral layers if needed. Usually
|
||||
// this happens during ingest: this background housekeeping is for freezing layers
|
||||
// that are open but haven't been written to for some time.
|
||||
@@ -4134,7 +4005,6 @@ impl Tenant {
|
||||
// use an extremely long backoff.
|
||||
Some(Duration::from_secs(3600 * 24)),
|
||||
)),
|
||||
scheduled_compaction_tasks: Mutex::new(Default::default()),
|
||||
activate_now_sem: tokio::sync::Semaphore::new(0),
|
||||
attach_wal_lag_cooldown: Arc::new(std::sync::OnceLock::new()),
|
||||
cancel: CancellationToken::default(),
|
||||
@@ -8166,12 +8036,6 @@ mod tests {
|
||||
)
|
||||
.await?;
|
||||
{
|
||||
tline
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x30))
|
||||
.wait()
|
||||
.await;
|
||||
// Update GC info
|
||||
let mut guard = tline.gc_info.write().unwrap();
|
||||
guard.cutoffs.time = Lsn(0x30);
|
||||
@@ -8274,12 +8138,6 @@ mod tests {
|
||||
|
||||
// increase GC horizon and compact again
|
||||
{
|
||||
tline
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x40))
|
||||
.wait()
|
||||
.await;
|
||||
// Update GC info
|
||||
let mut guard = tline.gc_info.write().unwrap();
|
||||
guard.cutoffs.time = Lsn(0x40);
|
||||
@@ -8660,12 +8518,6 @@ mod tests {
|
||||
.await?
|
||||
};
|
||||
{
|
||||
tline
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x30))
|
||||
.wait()
|
||||
.await;
|
||||
// Update GC info
|
||||
let mut guard = tline.gc_info.write().unwrap();
|
||||
*guard = GcInfo {
|
||||
@@ -8747,12 +8599,6 @@ mod tests {
|
||||
|
||||
// increase GC horizon and compact again
|
||||
{
|
||||
tline
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x40))
|
||||
.wait()
|
||||
.await;
|
||||
// Update GC info
|
||||
let mut guard = tline.gc_info.write().unwrap();
|
||||
guard.cutoffs.time = Lsn(0x40);
|
||||
@@ -9200,12 +9046,6 @@ mod tests {
|
||||
)
|
||||
.await?;
|
||||
{
|
||||
tline
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x30))
|
||||
.wait()
|
||||
.await;
|
||||
// Update GC info
|
||||
let mut guard = tline.gc_info.write().unwrap();
|
||||
*guard = GcInfo {
|
||||
@@ -9323,7 +9163,6 @@ mod tests {
|
||||
CompactOptions {
|
||||
flags: dryrun_flags,
|
||||
compact_range: None,
|
||||
..Default::default()
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
@@ -9348,12 +9187,6 @@ mod tests {
|
||||
|
||||
// increase GC horizon and compact again
|
||||
{
|
||||
tline
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x38))
|
||||
.wait()
|
||||
.await;
|
||||
// Update GC info
|
||||
let mut guard = tline.gc_info.write().unwrap();
|
||||
guard.cutoffs.time = Lsn(0x38);
|
||||
@@ -9449,12 +9282,6 @@ mod tests {
|
||||
)
|
||||
.await?;
|
||||
{
|
||||
tline
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x30))
|
||||
.wait()
|
||||
.await;
|
||||
// Update GC info
|
||||
let mut guard = tline.gc_info.write().unwrap();
|
||||
*guard = GcInfo {
|
||||
@@ -9572,7 +9399,6 @@ mod tests {
|
||||
CompactOptions {
|
||||
flags: dryrun_flags,
|
||||
compact_range: None,
|
||||
..Default::default()
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
@@ -9699,12 +9525,6 @@ mod tests {
|
||||
branch_tline.add_extra_test_dense_keyspace(KeySpace::single(get_key(0)..get_key(10)));
|
||||
|
||||
{
|
||||
parent_tline
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x10))
|
||||
.wait()
|
||||
.await;
|
||||
// Update GC info
|
||||
let mut guard = parent_tline.gc_info.write().unwrap();
|
||||
*guard = GcInfo {
|
||||
@@ -9719,12 +9539,6 @@ mod tests {
|
||||
}
|
||||
|
||||
{
|
||||
branch_tline
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x50))
|
||||
.wait()
|
||||
.await;
|
||||
// Update GC info
|
||||
let mut guard = branch_tline.gc_info.write().unwrap();
|
||||
*guard = GcInfo {
|
||||
@@ -10054,12 +9868,6 @@ mod tests {
|
||||
.await?;
|
||||
|
||||
{
|
||||
tline
|
||||
.latest_gc_cutoff_lsn
|
||||
.lock_for_write()
|
||||
.store_and_unlock(Lsn(0x30))
|
||||
.wait()
|
||||
.await;
|
||||
// Update GC info
|
||||
let mut guard = tline.gc_info.write().unwrap();
|
||||
*guard = GcInfo {
|
||||
@@ -10077,15 +9885,7 @@ mod tests {
|
||||
|
||||
// Do a partial compaction on key range 0..2
|
||||
tline
|
||||
.compact_with_gc(
|
||||
&cancel,
|
||||
CompactOptions {
|
||||
flags: EnumSet::new(),
|
||||
compact_range: Some((get_key(0)..get_key(2)).into()),
|
||||
..Default::default()
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.partial_compact_with_gc(get_key(0)..get_key(2), &cancel, EnumSet::new(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
|
||||
@@ -10124,15 +9924,7 @@ mod tests {
|
||||
|
||||
// Do a partial compaction on key range 2..4
|
||||
tline
|
||||
.compact_with_gc(
|
||||
&cancel,
|
||||
CompactOptions {
|
||||
flags: EnumSet::new(),
|
||||
compact_range: Some((get_key(2)..get_key(4)).into()),
|
||||
..Default::default()
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.partial_compact_with_gc(get_key(2)..get_key(4), &cancel, EnumSet::new(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
|
||||
@@ -10176,15 +9968,7 @@ mod tests {
|
||||
|
||||
// Do a partial compaction on key range 4..9
|
||||
tline
|
||||
.compact_with_gc(
|
||||
&cancel,
|
||||
CompactOptions {
|
||||
flags: EnumSet::new(),
|
||||
compact_range: Some((get_key(4)..get_key(9)).into()),
|
||||
..Default::default()
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.partial_compact_with_gc(get_key(4)..get_key(9), &cancel, EnumSet::new(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
|
||||
@@ -10227,15 +10011,7 @@ mod tests {
|
||||
|
||||
// Do a partial compaction on key range 9..10
|
||||
tline
|
||||
.compact_with_gc(
|
||||
&cancel,
|
||||
CompactOptions {
|
||||
flags: EnumSet::new(),
|
||||
compact_range: Some((get_key(9)..get_key(10)).into()),
|
||||
..Default::default()
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.partial_compact_with_gc(get_key(9)..get_key(10), &cancel, EnumSet::new(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
|
||||
@@ -10283,15 +10059,7 @@ mod tests {
|
||||
|
||||
// Do a partial compaction on key range 0..10, all image layers below LSN 20 can be replaced with new ones.
|
||||
tline
|
||||
.compact_with_gc(
|
||||
&cancel,
|
||||
CompactOptions {
|
||||
flags: EnumSet::new(),
|
||||
compact_range: Some((get_key(0)..get_key(10)).into()),
|
||||
..Default::default()
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.partial_compact_with_gc(get_key(0)..get_key(10), &cancel, EnumSet::new(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use utils::id::TimelineId;
|
||||
|
||||
@@ -20,7 +20,7 @@ pub(crate) struct GcBlock {
|
||||
/// Do not add any more features taking and forbidding taking this lock. It should be
|
||||
/// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`]
|
||||
/// synchronizes with gc attempts by locking and unlocking this mutex.
|
||||
blocking: Arc<tokio::sync::Mutex<()>>,
|
||||
blocking: tokio::sync::Mutex<()>,
|
||||
}
|
||||
|
||||
impl GcBlock {
|
||||
@@ -30,7 +30,7 @@ impl GcBlock {
|
||||
/// it's ending, or if not currently possible, a value describing the reasons why not.
|
||||
///
|
||||
/// Cancellation safe.
|
||||
pub(super) async fn start(&self) -> Result<Guard, BlockingReasons> {
|
||||
pub(super) async fn start(&self) -> Result<Guard<'_>, BlockingReasons> {
|
||||
let reasons = {
|
||||
let g = self.reasons.lock().unwrap();
|
||||
|
||||
@@ -44,7 +44,7 @@ impl GcBlock {
|
||||
Err(reasons)
|
||||
} else {
|
||||
Ok(Guard {
|
||||
_inner: self.blocking.clone().lock_owned().await,
|
||||
_inner: self.blocking.lock().await,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -170,8 +170,8 @@ impl GcBlock {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct Guard {
|
||||
_inner: tokio::sync::OwnedMutexGuard<()>,
|
||||
pub(super) struct Guard<'a> {
|
||||
_inner: tokio::sync::MutexGuard<'a, ()>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -2192,9 +2192,6 @@ impl RemoteTimelineClient {
|
||||
upload_queue.clean.1 = Some(task.task_id);
|
||||
|
||||
let lsn = upload_queue.clean.0.metadata.disk_consistent_lsn();
|
||||
self.metrics
|
||||
.projected_remote_consistent_lsn_gauge
|
||||
.set(lsn.0);
|
||||
|
||||
if self.generation.is_none() {
|
||||
// Legacy mode: skip validating generation
|
||||
|
||||
@@ -53,7 +53,7 @@ use utils::{
|
||||
postgres_client::PostgresClientProtocol,
|
||||
sync::gate::{Gate, GateGuard},
|
||||
};
|
||||
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
|
||||
use wal_decoder::serialized_batch::SerializedValueBatch;
|
||||
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::{Arc, Mutex, RwLock, Weak};
|
||||
@@ -768,7 +768,7 @@ pub enum GetLogicalSizePriority {
|
||||
Background,
|
||||
}
|
||||
|
||||
#[derive(Debug, enumset::EnumSetType)]
|
||||
#[derive(enumset::EnumSetType)]
|
||||
pub(crate) enum CompactFlags {
|
||||
ForceRepartition,
|
||||
ForceImageLayerCreation,
|
||||
@@ -777,19 +777,6 @@ pub(crate) enum CompactFlags {
|
||||
DryRun,
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Debug, Clone, serde::Deserialize)]
|
||||
pub(crate) struct CompactRequest {
|
||||
pub compact_range: Option<CompactRange>,
|
||||
pub compact_below_lsn: Option<Lsn>,
|
||||
/// Whether the compaction job should be scheduled.
|
||||
#[serde(default)]
|
||||
pub scheduled: bool,
|
||||
/// Whether the compaction job should be split across key ranges.
|
||||
#[serde(default)]
|
||||
pub sub_compaction: bool,
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Debug, Clone, serde::Deserialize)]
|
||||
pub(crate) struct CompactRange {
|
||||
@@ -799,27 +786,10 @@ pub(crate) struct CompactRange {
|
||||
pub end: Key,
|
||||
}
|
||||
|
||||
impl From<Range<Key>> for CompactRange {
|
||||
fn from(range: Range<Key>) -> Self {
|
||||
CompactRange {
|
||||
start: range.start,
|
||||
end: range.end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
#[derive(Clone, Default)]
|
||||
pub(crate) struct CompactOptions {
|
||||
pub flags: EnumSet<CompactFlags>,
|
||||
/// If set, the compaction will only compact the key range specified by this option.
|
||||
/// This option is only used by GC compaction.
|
||||
pub compact_range: Option<CompactRange>,
|
||||
/// If set, the compaction will only compact the LSN below this value.
|
||||
/// This option is only used by GC compaction.
|
||||
pub compact_below_lsn: Option<Lsn>,
|
||||
/// Enable sub-compaction (split compaction job across key ranges).
|
||||
/// This option is only used by GC compaction.
|
||||
pub sub_compaction: bool,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Timeline {
|
||||
@@ -1463,31 +1433,23 @@ impl Timeline {
|
||||
Ok(lease)
|
||||
}
|
||||
|
||||
/// Freeze the current open in-memory layer. It will be written to disk on next iteration.
|
||||
/// Returns the flush request ID which can be awaited with wait_flush_completion().
|
||||
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
|
||||
pub(crate) async fn freeze(&self) -> Result<u64, FlushLayerError> {
|
||||
self.freeze0().await
|
||||
}
|
||||
|
||||
/// Freeze and flush the open in-memory layer, waiting for it to be written to disk.
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
|
||||
pub(crate) async fn freeze_and_flush(&self) -> Result<(), FlushLayerError> {
|
||||
self.freeze_and_flush0().await
|
||||
}
|
||||
|
||||
/// Freeze the current open in-memory layer. It will be written to disk on next iteration.
|
||||
/// Returns the flush request ID which can be awaited with wait_flush_completion().
|
||||
pub(crate) async fn freeze0(&self) -> Result<u64, FlushLayerError> {
|
||||
let mut g = self.write_lock.lock().await;
|
||||
let to_lsn = self.get_last_record_lsn();
|
||||
self.freeze_inmem_layer_at(to_lsn, &mut g).await
|
||||
}
|
||||
|
||||
// This exists to provide a non-span creating version of `freeze_and_flush` we can call without
|
||||
// polluting the span hierarchy.
|
||||
pub(crate) async fn freeze_and_flush0(&self) -> Result<(), FlushLayerError> {
|
||||
let token = self.freeze0().await?;
|
||||
let token = {
|
||||
// Freeze the current open in-memory layer. It will be written to disk on next
|
||||
// iteration.
|
||||
let mut g = self.write_lock.lock().await;
|
||||
|
||||
let to_lsn = self.get_last_record_lsn();
|
||||
self.freeze_inmem_layer_at(to_lsn, &mut g).await?
|
||||
};
|
||||
self.wait_flush_completion(token).await
|
||||
}
|
||||
|
||||
@@ -1642,8 +1604,6 @@ impl Timeline {
|
||||
CompactOptions {
|
||||
flags,
|
||||
compact_range: None,
|
||||
compact_below_lsn: None,
|
||||
sub_compaction: false,
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
@@ -2399,7 +2359,7 @@ impl Timeline {
|
||||
|
||||
result
|
||||
.metrics
|
||||
.last_record_lsn_gauge
|
||||
.last_record_gauge
|
||||
.set(disk_consistent_lsn.0 as i64);
|
||||
result
|
||||
})
|
||||
@@ -3521,7 +3481,7 @@ impl Timeline {
|
||||
pub(crate) fn finish_write(&self, new_lsn: Lsn) {
|
||||
assert!(new_lsn.is_aligned());
|
||||
|
||||
self.metrics.last_record_lsn_gauge.set(new_lsn.0 as i64);
|
||||
self.metrics.last_record_gauge.set(new_lsn.0 as i64);
|
||||
self.last_record_lsn.advance(new_lsn);
|
||||
}
|
||||
|
||||
@@ -3889,10 +3849,6 @@ impl Timeline {
|
||||
fn set_disk_consistent_lsn(&self, new_value: Lsn) -> bool {
|
||||
let old_value = self.disk_consistent_lsn.fetch_max(new_value);
|
||||
assert!(new_value >= old_value, "disk_consistent_lsn must be growing monotonously at runtime; current {old_value}, offered {new_value}");
|
||||
|
||||
self.metrics
|
||||
.disk_consistent_lsn_gauge
|
||||
.set(new_value.0 as i64);
|
||||
new_value != old_value
|
||||
}
|
||||
|
||||
@@ -5931,23 +5887,6 @@ impl<'a> TimelineWriter<'a> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// In debug builds, assert that we don't write any keys that don't belong to this shard.
|
||||
// We don't assert this in release builds, since key ownership policies may change over
|
||||
// time. Stray keys will be removed during compaction.
|
||||
if cfg!(debug_assertions) {
|
||||
for metadata in &batch.metadata {
|
||||
if let ValueMeta::Serialized(metadata) = metadata {
|
||||
let key = Key::from_compact(metadata.key);
|
||||
assert!(
|
||||
self.shard_identity.is_key_local(&key)
|
||||
|| self.shard_identity.is_key_global(&key),
|
||||
"key {key} does not belong on shard {}",
|
||||
self.shard_identity.shard_index()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let batch_max_lsn = batch.max_lsn;
|
||||
let buf_size: u64 = batch.buffer_size() as u64;
|
||||
|
||||
|
||||
@@ -10,12 +10,13 @@ use std::sync::Arc;
|
||||
|
||||
use super::layer_manager::LayerManager;
|
||||
use super::{
|
||||
CompactFlags, CompactOptions, CompactRange, CreateImageLayersError, DurationRecorder,
|
||||
ImageLayerCreationMode, RecordedDuration, Timeline,
|
||||
CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
|
||||
RecordedDuration, Timeline,
|
||||
};
|
||||
|
||||
use anyhow::{anyhow, bail, Context};
|
||||
use bytes::Bytes;
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::KEY_SIZE;
|
||||
@@ -29,6 +30,7 @@ use utils::id::TimelineId;
|
||||
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::page_cache;
|
||||
use crate::statvfs::Statvfs;
|
||||
use crate::tenant::checks::check_valid_layermap;
|
||||
use crate::tenant::remote_timeline_client::WaitCompletionError;
|
||||
use crate::tenant::storage_layer::batch_split_writer::{
|
||||
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
|
||||
@@ -41,7 +43,7 @@ use crate::tenant::storage_layer::{
|
||||
use crate::tenant::timeline::ImageLayerCreationOutcome;
|
||||
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
|
||||
use crate::tenant::timeline::{Layer, ResidentLayer};
|
||||
use crate::tenant::{gc_block, DeltaLayer, MaybeOffloaded};
|
||||
use crate::tenant::{DeltaLayer, MaybeOffloaded};
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
use pageserver_api::config::tenant_conf_defaults::{
|
||||
DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
|
||||
@@ -62,15 +64,6 @@ use super::CompactionError;
|
||||
/// Maximum number of deltas before generating an image layer in bottom-most compaction.
|
||||
const COMPACTION_DELTA_THRESHOLD: usize = 5;
|
||||
|
||||
/// A scheduled compaction task.
|
||||
pub(crate) struct ScheduledCompactionTask {
|
||||
pub options: CompactOptions,
|
||||
/// The channel to send the compaction result. If this is a subcompaction, the last compaction job holds the sender.
|
||||
pub result_tx: Option<tokio::sync::oneshot::Sender<()>>,
|
||||
/// Hold the GC block. If this is a subcompaction, the last compaction job holds the gc block guard.
|
||||
pub gc_block: Option<gc_block::Guard>,
|
||||
}
|
||||
|
||||
pub struct GcCompactionJobDescription {
|
||||
/// All layers to read in the compaction job
|
||||
selected_layers: Vec<Layer>,
|
||||
@@ -1181,12 +1174,11 @@ impl Timeline {
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
} else {
|
||||
let shard = self.shard_identity.shard_index();
|
||||
let owner = self.shard_identity.get_shard_number(&key);
|
||||
if cfg!(debug_assertions) {
|
||||
panic!("key {key} does not belong on shard {shard}, owned by {owner}");
|
||||
}
|
||||
debug!("dropping key {key} during compaction (it belongs on shard {owner})");
|
||||
debug!(
|
||||
"Dropping key {} during compaction (it belongs on shard {:?})",
|
||||
key,
|
||||
self.shard_identity.get_shard_number(&key)
|
||||
);
|
||||
}
|
||||
|
||||
if !new_layers.is_empty() {
|
||||
@@ -1754,113 +1746,22 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Split a gc-compaction job into multiple compaction jobs. Optimally, this function should return a vector of
|
||||
/// `GcCompactionJobDesc`. But we want to keep it simple on the tenant scheduling side without exposing too much
|
||||
/// ad-hoc information about gc compaction itself.
|
||||
pub(crate) async fn gc_compaction_split_jobs(
|
||||
pub(crate) async fn compact_with_gc(
|
||||
self: &Arc<Self>,
|
||||
cancel: &CancellationToken,
|
||||
options: CompactOptions,
|
||||
) -> anyhow::Result<Vec<CompactOptions>> {
|
||||
if !options.sub_compaction {
|
||||
return Ok(vec![options]);
|
||||
}
|
||||
let compact_range = options.compact_range.clone().unwrap_or(CompactRange {
|
||||
start: Key::MIN,
|
||||
end: Key::MAX,
|
||||
});
|
||||
let compact_below_lsn = if let Some(compact_below_lsn) = options.compact_below_lsn {
|
||||
compact_below_lsn
|
||||
} else {
|
||||
*self.get_latest_gc_cutoff_lsn() // use the real gc cutoff
|
||||
};
|
||||
let mut compact_jobs = Vec::new();
|
||||
// For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
|
||||
// by estimating the amount of files read for a compaction job. We should also partition on LSN.
|
||||
let Ok(partition) = self.partitioning.try_lock() else {
|
||||
bail!("failed to acquire partition lock");
|
||||
};
|
||||
let ((dense_ks, sparse_ks), _) = &*partition;
|
||||
// Truncate the key range to be within user specified compaction range.
|
||||
fn truncate_to(
|
||||
source_start: &Key,
|
||||
source_end: &Key,
|
||||
target_start: &Key,
|
||||
target_end: &Key,
|
||||
) -> Option<(Key, Key)> {
|
||||
let start = source_start.max(target_start);
|
||||
let end = source_end.min(target_end);
|
||||
if start < end {
|
||||
Some((*start, *end))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
let mut split_key_ranges = Vec::new();
|
||||
let ranges = dense_ks
|
||||
.parts
|
||||
.iter()
|
||||
.map(|partition| partition.ranges.iter())
|
||||
.chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
|
||||
.flatten()
|
||||
.cloned()
|
||||
.collect_vec();
|
||||
for range in ranges.iter() {
|
||||
let Some((start, end)) = truncate_to(
|
||||
&range.start,
|
||||
&range.end,
|
||||
&compact_range.start,
|
||||
&compact_range.end,
|
||||
) else {
|
||||
continue;
|
||||
};
|
||||
split_key_ranges.push((start, end));
|
||||
}
|
||||
split_key_ranges.sort();
|
||||
let guard = self.layers.read().await;
|
||||
let layer_map = guard.layer_map()?;
|
||||
let mut current_start = None;
|
||||
// Split compaction job to about 2GB each
|
||||
const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024; // 4GB, TODO: should be configuration in the future
|
||||
let ranges_num = split_key_ranges.len();
|
||||
for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
|
||||
if current_start.is_none() {
|
||||
current_start = Some(start);
|
||||
}
|
||||
let start = current_start.unwrap();
|
||||
if start >= end {
|
||||
// We have already processed this partition.
|
||||
continue;
|
||||
}
|
||||
let res = layer_map.range_search(start..end, compact_below_lsn);
|
||||
let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::<u64>();
|
||||
if total_size > GC_COMPACT_MAX_SIZE_MB * 1024 * 1024 || ranges_num == idx + 1 {
|
||||
let mut compact_options = options.clone();
|
||||
// Try to extend the compaction range so that we include at least one full layer file.
|
||||
let extended_end = res
|
||||
.found
|
||||
.keys()
|
||||
.map(|layer| layer.layer.key_range.end)
|
||||
.min();
|
||||
// It is possible that the search range does not contain any layer files when we reach the end of the loop.
|
||||
// In this case, we simply use the specified key range end.
|
||||
let end = if let Some(extended_end) = extended_end {
|
||||
extended_end.max(end)
|
||||
} else {
|
||||
end
|
||||
};
|
||||
info!(
|
||||
"splitting compaction job: {}..{}, estimated_size={}",
|
||||
start, end, total_size
|
||||
);
|
||||
compact_options.compact_range = Some(CompactRange { start, end });
|
||||
compact_options.compact_below_lsn = Some(compact_below_lsn);
|
||||
compact_options.sub_compaction = false;
|
||||
compact_jobs.push(compact_options);
|
||||
current_start = Some(end);
|
||||
}
|
||||
}
|
||||
drop(guard);
|
||||
Ok(compact_jobs)
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
self.partial_compact_with_gc(
|
||||
options
|
||||
.compact_range
|
||||
.map(|range| range.start..range.end)
|
||||
.unwrap_or_else(|| Key::MIN..Key::MAX),
|
||||
cancel,
|
||||
options.flags,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// An experimental compaction building block that combines compaction with garbage collection.
|
||||
@@ -1870,51 +1771,19 @@ impl Timeline {
|
||||
/// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
|
||||
/// and create delta layers with all deltas >= gc horizon.
|
||||
///
|
||||
/// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction.
|
||||
/// If `key_range` is provided, it will only compact the keys within the range, aka partial compaction.
|
||||
/// Partial compaction will read and process all layers overlapping with the key range, even if it might
|
||||
/// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
|
||||
/// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
|
||||
/// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
|
||||
/// part of the range.
|
||||
///
|
||||
/// If `options.compact_below_lsn` is provided, the compaction will only compact layers below or intersect with
|
||||
/// the LSN. Otherwise, it will use the gc cutoff by default.
|
||||
pub(crate) async fn compact_with_gc(
|
||||
pub(crate) async fn partial_compact_with_gc(
|
||||
self: &Arc<Self>,
|
||||
compaction_key_range: Range<Key>,
|
||||
cancel: &CancellationToken,
|
||||
options: CompactOptions,
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
if options.sub_compaction {
|
||||
info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
|
||||
let jobs = self.gc_compaction_split_jobs(options).await?;
|
||||
let jobs_len = jobs.len();
|
||||
for (idx, job) in jobs.into_iter().enumerate() {
|
||||
info!(
|
||||
"running enhanced gc bottom-most compaction, sub-compaction {}/{}",
|
||||
idx + 1,
|
||||
jobs_len
|
||||
);
|
||||
self.compact_with_gc_inner(cancel, job, ctx).await?;
|
||||
}
|
||||
if jobs_len == 0 {
|
||||
info!("no jobs to run, skipping gc bottom-most compaction");
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
self.compact_with_gc_inner(cancel, options, ctx).await
|
||||
}
|
||||
|
||||
async fn compact_with_gc_inner(
|
||||
self: &Arc<Self>,
|
||||
cancel: &CancellationToken,
|
||||
options: CompactOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
assert!(
|
||||
!options.sub_compaction,
|
||||
"sub-compaction should be handled by the outer function"
|
||||
);
|
||||
// Block other compaction/GC tasks from running for now. GC-compaction could run along
|
||||
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
|
||||
// Note that we already acquired the compaction lock when the outer `compact` function gets called.
|
||||
@@ -1934,12 +1803,6 @@ impl Timeline {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let flags = options.flags;
|
||||
let compaction_key_range = options
|
||||
.compact_range
|
||||
.map(|range| range.start..range.end)
|
||||
.unwrap_or_else(|| Key::MIN..Key::MAX);
|
||||
|
||||
let dry_run = flags.contains(CompactFlags::DryRun);
|
||||
|
||||
if compaction_key_range == (Key::MIN..Key::MAX) {
|
||||
@@ -1963,22 +1826,7 @@ impl Timeline {
|
||||
let layers = guard.layer_map()?;
|
||||
let gc_info = self.gc_info.read().unwrap();
|
||||
let mut retain_lsns_below_horizon = Vec::new();
|
||||
let gc_cutoff = {
|
||||
// Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
|
||||
// Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
|
||||
// cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
|
||||
// to get the truth data.
|
||||
let real_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
|
||||
// The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
|
||||
// each of the retain_lsn. Therefore, if the user-provided `compact_below_lsn` is larger than the real gc cutoff, we will use
|
||||
// the real cutoff.
|
||||
let mut gc_cutoff = options.compact_below_lsn.unwrap_or(real_gc_cutoff);
|
||||
if gc_cutoff > real_gc_cutoff {
|
||||
warn!("provided compact_below_lsn={} is larger than the real_gc_cutoff={}, using the real gc cutoff", gc_cutoff, real_gc_cutoff);
|
||||
gc_cutoff = real_gc_cutoff;
|
||||
}
|
||||
gc_cutoff
|
||||
};
|
||||
let gc_cutoff = gc_info.cutoffs.select_min();
|
||||
for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
|
||||
if lsn < &gc_cutoff {
|
||||
retain_lsns_below_horizon.push(*lsn);
|
||||
@@ -1998,7 +1846,7 @@ impl Timeline {
|
||||
.map(|desc| desc.get_lsn_range().end)
|
||||
.max()
|
||||
else {
|
||||
info!("no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}", gc_cutoff);
|
||||
info!("no layers to compact with gc");
|
||||
return Ok(());
|
||||
};
|
||||
// Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
|
||||
@@ -2021,7 +1869,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
if selected_layers.is_empty() {
|
||||
info!("no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}", gc_cutoff, compaction_key_range.start, compaction_key_range.end);
|
||||
info!("no layers to compact with gc");
|
||||
return Ok(());
|
||||
}
|
||||
retain_lsns_below_horizon.sort();
|
||||
@@ -2088,15 +1936,14 @@ impl Timeline {
|
||||
|
||||
// Step 1: construct a k-merge iterator over all layers.
|
||||
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
|
||||
// disable the check for now because we need to adjust the check for partial compactions, will enable later.
|
||||
// let layer_names = job_desc
|
||||
// .selected_layers
|
||||
// .iter()
|
||||
// .map(|layer| layer.layer_desc().layer_name())
|
||||
// .collect_vec();
|
||||
// if let Some(err) = check_valid_layermap(&layer_names) {
|
||||
// warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
|
||||
// }
|
||||
let layer_names = job_desc
|
||||
.selected_layers
|
||||
.iter()
|
||||
.map(|layer| layer.layer_desc().layer_name())
|
||||
.collect_vec();
|
||||
if let Some(err) = check_valid_layermap(&layer_names) {
|
||||
warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
|
||||
}
|
||||
// The maximum LSN we are processing in this compaction loop
|
||||
let end_lsn = job_desc
|
||||
.selected_layers
|
||||
@@ -2201,11 +2048,6 @@ impl Timeline {
|
||||
// This is not handled in the filter iterator because shard is determined by hash.
|
||||
// Therefore, it does not give us any performance benefit to do things like skip
|
||||
// a whole layer file as handling key spaces (ranges).
|
||||
if cfg!(debug_assertions) {
|
||||
let shard = self.shard_identity.shard_index();
|
||||
let owner = self.shard_identity.get_shard_number(&key);
|
||||
panic!("key {key} does not belong on shard {shard}, owned by {owner}");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if !job_desc.compaction_key_range.contains(&key) {
|
||||
|
||||
@@ -369,13 +369,6 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// advances it to its end LSN. 0 is just an initialization placeholder.
|
||||
let mut modification = timeline.begin_modification(Lsn(0));
|
||||
|
||||
if !records.is_empty() {
|
||||
timeline
|
||||
.metrics
|
||||
.wal_records_received
|
||||
.inc_by(records.len() as u64);
|
||||
}
|
||||
|
||||
for interpreted in records {
|
||||
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
|
||||
&& uncommitted_records > 0
|
||||
@@ -517,7 +510,6 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
}
|
||||
|
||||
// Ingest the records without immediately committing them.
|
||||
timeline.metrics.wal_records_received.inc();
|
||||
let ingested = walingest
|
||||
.ingest_record(interpreted, &mut modification, &ctx)
|
||||
.await
|
||||
|
||||
@@ -582,21 +582,18 @@ impl WalIngest {
|
||||
forknum: FSM_FORKNUM,
|
||||
};
|
||||
|
||||
// Zero out the last remaining FSM page, if this shard owns it. We are not precise here,
|
||||
// and instead of digging in the FSM bitmap format we just clear the whole page.
|
||||
let fsm_logical_page_no = blkno / pg_constants::SLOTS_PER_FSM_PAGE;
|
||||
let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
|
||||
if blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0
|
||||
&& self
|
||||
.shard
|
||||
.is_key_local(&rel_block_to_key(rel, fsm_physical_page_no))
|
||||
{
|
||||
if blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
|
||||
// Tail of last remaining FSM page has to be zeroed.
|
||||
// We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
|
||||
modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
|
||||
fsm_physical_page_no += 1;
|
||||
}
|
||||
// Truncate this shard's view of the FSM relation size, if it even has one.
|
||||
// TODO: re-examine the None case here wrt. sharding; should we error?
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
|
||||
if nblocks > fsm_physical_page_no {
|
||||
// check if something to do: FSM is larger than truncate position
|
||||
self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
|
||||
.await?;
|
||||
}
|
||||
@@ -620,7 +617,7 @@ impl WalIngest {
|
||||
// tail bits in the last remaining map page, representing truncated heap
|
||||
// blocks, need to be cleared. This is not only tidy, but also necessary
|
||||
// because we don't get a chance to clear the bits if the heap is extended
|
||||
// again. Only do this on the shard that owns the page.
|
||||
// again.
|
||||
if (trunc_byte != 0 || trunc_offs != 0)
|
||||
&& self.shard.is_key_local(&rel_block_to_key(rel, vm_page_no))
|
||||
{
|
||||
@@ -634,9 +631,10 @@ impl WalIngest {
|
||||
)?;
|
||||
vm_page_no += 1;
|
||||
}
|
||||
// Truncate this shard's view of the VM relation size, if it even has one.
|
||||
// TODO: re-examine the None case here wrt. sharding; should we error?
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
|
||||
if nblocks > vm_page_no {
|
||||
// check if something to do: VM is larger than truncate position
|
||||
self.put_rel_truncation(modification, rel, vm_page_no, ctx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -610,9 +610,6 @@ prefetch_read(PrefetchRequest *slot)
|
||||
{
|
||||
NeonResponse *response;
|
||||
MemoryContext old;
|
||||
BufferTag buftag;
|
||||
shardno_t shard_no;
|
||||
uint64 my_ring_index;
|
||||
|
||||
Assert(slot->status == PRFS_REQUESTED);
|
||||
Assert(slot->response == NULL);
|
||||
@@ -626,29 +623,11 @@ prefetch_read(PrefetchRequest *slot)
|
||||
slot->status, slot->response,
|
||||
(long)slot->my_ring_index, (long)MyPState->ring_receive);
|
||||
|
||||
/*
|
||||
* Copy the request info so that if an error happens and the prefetch
|
||||
* queue is flushed during the receive call, we can print the original
|
||||
* values in the error message
|
||||
*/
|
||||
buftag = slot->buftag;
|
||||
shard_no = slot->shard_no;
|
||||
my_ring_index = slot->my_ring_index;
|
||||
|
||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||
response = (NeonResponse *) page_server->receive(shard_no);
|
||||
response = (NeonResponse *) page_server->receive(slot->shard_no);
|
||||
MemoryContextSwitchTo(old);
|
||||
if (response)
|
||||
{
|
||||
/* The slot should still be valid */
|
||||
if (slot->status != PRFS_REQUESTED ||
|
||||
slot->response != NULL ||
|
||||
slot->my_ring_index != MyPState->ring_receive)
|
||||
neon_shard_log(shard_no, ERROR,
|
||||
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
|
||||
slot->status, slot->response,
|
||||
(long) slot->my_ring_index, (long) MyPState->ring_receive);
|
||||
|
||||
/* update prefetch state */
|
||||
MyPState->n_responses_buffered += 1;
|
||||
MyPState->n_requests_inflight -= 1;
|
||||
@@ -663,15 +642,11 @@ prefetch_read(PrefetchRequest *slot)
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Note: The slot might no longer be valid, if the connection was lost
|
||||
* and the prefetch queue was flushed during the receive call
|
||||
*/
|
||||
neon_shard_log(shard_no, LOG,
|
||||
neon_shard_log(slot->shard_no, LOG,
|
||||
"No response from reading prefetch entry %lu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
|
||||
(long) my_ring_index,
|
||||
RelFileInfoFmt(BufTagGetNRelFileInfo(buftag)),
|
||||
buftag.forkNum, buftag.blockNum);
|
||||
(long)slot->my_ring_index,
|
||||
RelFileInfoFmt(BufTagGetNRelFileInfo(slot->buftag)),
|
||||
slot->buftag.forkNum, slot->buftag.blockNum);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
126
proxy/src/cache/project_info.rs
vendored
126
proxy/src/cache/project_info.rs
vendored
@@ -16,15 +16,12 @@ use super::{Cache, Cached};
|
||||
use crate::auth::IpPattern;
|
||||
use crate::config::ProjectInfoCacheOptions;
|
||||
use crate::control_plane::AuthSecret;
|
||||
use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt, RoleNameInt};
|
||||
use crate::intern::{EndpointIdInt, ProjectIdInt, RoleNameInt};
|
||||
use crate::types::{EndpointId, RoleName};
|
||||
|
||||
#[async_trait]
|
||||
pub(crate) trait ProjectInfoCache {
|
||||
fn invalidate_allowed_ips_for_project(&self, project_id: ProjectIdInt);
|
||||
fn invalidate_allowed_vpc_endpoint_ids_for_projects(&self, project_ids: Vec<ProjectIdInt>);
|
||||
fn invalidate_allowed_vpc_endpoint_ids_for_org(&self, account_id: AccountIdInt);
|
||||
fn invalidate_block_public_or_vpc_access_for_project(&self, project_id: ProjectIdInt);
|
||||
fn invalidate_role_secret_for_project(&self, project_id: ProjectIdInt, role_name: RoleNameInt);
|
||||
async fn decrement_active_listeners(&self);
|
||||
async fn increment_active_listeners(&self);
|
||||
@@ -54,8 +51,6 @@ impl<T> From<T> for Entry<T> {
|
||||
struct EndpointInfo {
|
||||
secret: std::collections::HashMap<RoleNameInt, Entry<Option<AuthSecret>>>,
|
||||
allowed_ips: Option<Entry<Arc<Vec<IpPattern>>>>,
|
||||
block_public_or_vpc_access: Option<Entry<(bool, bool)>>,
|
||||
allowed_vpc_endpoint_ids: Option<Entry<Arc<Vec<String>>>>,
|
||||
}
|
||||
|
||||
impl EndpointInfo {
|
||||
@@ -97,51 +92,6 @@ impl EndpointInfo {
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub(crate) fn get_allowed_vpc_endpoint_ids(
|
||||
&self,
|
||||
valid_since: Instant,
|
||||
ignore_cache_since: Option<Instant>,
|
||||
) -> Option<(Arc<Vec<String>>, bool)> {
|
||||
if let Some(allowed_vpc_endpoint_ids) = &self.allowed_vpc_endpoint_ids {
|
||||
if valid_since < allowed_vpc_endpoint_ids.created_at {
|
||||
return Some((
|
||||
allowed_vpc_endpoint_ids.value.clone(),
|
||||
Self::check_ignore_cache(
|
||||
ignore_cache_since,
|
||||
allowed_vpc_endpoint_ids.created_at,
|
||||
),
|
||||
));
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub(crate) fn get_block_public_or_vpc_access(
|
||||
&self,
|
||||
valid_since: Instant,
|
||||
ignore_cache_since: Option<Instant>,
|
||||
) -> Option<((bool, bool), bool)> {
|
||||
if let Some(block_public_or_vpc_access) = &self.block_public_or_vpc_access {
|
||||
if valid_since < block_public_or_vpc_access.created_at {
|
||||
return Some((
|
||||
block_public_or_vpc_access.value.clone(),
|
||||
Self::check_ignore_cache(
|
||||
ignore_cache_since,
|
||||
block_public_or_vpc_access.created_at,
|
||||
),
|
||||
));
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub(crate) fn invalidate_block_public_or_vpc_access(&mut self) {
|
||||
self.block_public_or_vpc_access = None;
|
||||
}
|
||||
pub(crate) fn invalidate_allowed_vpc_endpoint_ids(&mut self) {
|
||||
self.allowed_vpc_endpoint_ids = None;
|
||||
}
|
||||
pub(crate) fn invalidate_allowed_ips(&mut self) {
|
||||
self.allowed_ips = None;
|
||||
}
|
||||
@@ -161,8 +111,6 @@ pub struct ProjectInfoCacheImpl {
|
||||
cache: DashMap<EndpointIdInt, EndpointInfo>,
|
||||
|
||||
project2ep: DashMap<ProjectIdInt, HashSet<EndpointIdInt>>,
|
||||
// FIXME(stefan): we need a way to GC the account2ep map.
|
||||
account2ep: DashMap<AccountIdInt, HashSet<EndpointIdInt>>,
|
||||
config: ProjectInfoCacheOptions,
|
||||
|
||||
start_time: Instant,
|
||||
@@ -172,59 +120,6 @@ pub struct ProjectInfoCacheImpl {
|
||||
|
||||
#[async_trait]
|
||||
impl ProjectInfoCache for ProjectInfoCacheImpl {
|
||||
fn invalidate_allowed_vpc_endpoint_ids_for_projects(&self, project_ids: Vec<ProjectIdInt>) {
|
||||
info!(
|
||||
"invalidating allowed vpc endpoint ids for projects `{}`",
|
||||
project_ids.join(", ")
|
||||
);
|
||||
for project_id in project_ids {
|
||||
let endpoints = self
|
||||
.project2ep
|
||||
.get(&project_id)
|
||||
.map(|kv| kv.value().clone())
|
||||
.unwrap_or_default();
|
||||
for endpoint_id in endpoints {
|
||||
if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) {
|
||||
endpoint_info.invalidate_allowed_vpc_endpoint_ids();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn invalidate_allowed_vpc_endpoint_ids_for_org(&self, account_id: AccountIdInt) {
|
||||
info!(
|
||||
"invalidating allowed vpc endpoint ids for org `{}`",
|
||||
account_id
|
||||
);
|
||||
let endpoints = self
|
||||
.account2ep
|
||||
.get(&account_id)
|
||||
.map(|kv| kv.value().clone())
|
||||
.unwrap_or_default();
|
||||
for endpoint_id in endpoints {
|
||||
if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) {
|
||||
endpoint_info.invalidate_allowed_vpc_endpoint_ids();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn invalidate_block_public_or_vpc_access_for_project(&self, project_id: ProjectIdInt) {
|
||||
info!(
|
||||
"invalidating block public or vpc access for project `{}`",
|
||||
project_id
|
||||
);
|
||||
let endpoints = self
|
||||
.project2ep
|
||||
.get(&project_id)
|
||||
.map(|kv| kv.value().clone())
|
||||
.unwrap_or_default();
|
||||
for endpoint_id in endpoints {
|
||||
if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) {
|
||||
endpoint_info.invalidate_block_public_or_vpc_access();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn invalidate_allowed_ips_for_project(&self, project_id: ProjectIdInt) {
|
||||
info!("invalidating allowed ips for project `{}`", project_id);
|
||||
let endpoints = self
|
||||
@@ -283,7 +178,6 @@ impl ProjectInfoCacheImpl {
|
||||
Self {
|
||||
cache: DashMap::new(),
|
||||
project2ep: DashMap::new(),
|
||||
account2ep: DashMap::new(),
|
||||
config,
|
||||
ttl_disabled_since_us: AtomicU64::new(u64::MAX),
|
||||
start_time: Instant::now(),
|
||||
@@ -332,7 +226,6 @@ impl ProjectInfoCacheImpl {
|
||||
}
|
||||
Some(Cached::new_uncached(value))
|
||||
}
|
||||
|
||||
pub(crate) fn insert_role_secret(
|
||||
&self,
|
||||
project_id: ProjectIdInt,
|
||||
@@ -363,16 +256,6 @@ impl ProjectInfoCacheImpl {
|
||||
self.insert_project2endpoint(project_id, endpoint_id);
|
||||
self.cache.entry(endpoint_id).or_default().allowed_ips = Some(allowed_ips.into());
|
||||
}
|
||||
pub(crate) fn insert_vpc_allowed_endpoint_ids(&self, account_id: AccountIdInt, project_id: ProjectIdInt, endpoint_id: EndpointIdInt, vpc_allowed_endpoint_ids: HashSet<EndpointIdInt>) {
|
||||
if self.cache.len() >= self.config.size {
|
||||
// If there are too many entries, wait until the next gc cycle.
|
||||
return;
|
||||
}
|
||||
self.insert_account2endpoint(account_id, endpoint_id);
|
||||
self.insert_project2endpoint(project_id, endpoint_id);
|
||||
self.cache.entry(endpoint_id).or_default().vpc_allowed_endpoint_ids = Some(vpc_allowed_endpoint_ids);
|
||||
}
|
||||
}
|
||||
fn insert_project2endpoint(&self, project_id: ProjectIdInt, endpoint_id: EndpointIdInt) {
|
||||
if let Some(mut endpoints) = self.project2ep.get_mut(&project_id) {
|
||||
endpoints.insert(endpoint_id);
|
||||
@@ -381,13 +264,6 @@ impl ProjectInfoCacheImpl {
|
||||
.insert(project_id, HashSet::from([endpoint_id]));
|
||||
}
|
||||
}
|
||||
fn insert_account2endpoint(&self, account_id: AccountIdInt, endpoint_id: EndpointIdInt) {
|
||||
if let Some(mut endpoints) = self.account2ep.get_mut(&account_id) {
|
||||
endpoints.insert(endpoint_id);
|
||||
} else {
|
||||
self.account2ep.insert(account_id, HashSet::from([endpoint_id]));
|
||||
}
|
||||
}
|
||||
fn get_cache_times(&self) -> (Instant, Option<Instant>) {
|
||||
let mut valid_since = Instant::now() - self.config.ttl;
|
||||
// Only ignore cache if ttl is disabled.
|
||||
|
||||
@@ -115,8 +115,7 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
|
||||
IpAddr::V6(ip) => IpNet::V6(Ipv6Net::new_assert(ip, 64).trunc()),
|
||||
};
|
||||
if !self.limiter.lock().unwrap().check(subnet_key, 1) {
|
||||
// log only the subnet part of the IP address to know which subnet is rate limited
|
||||
tracing::warn!("Rate limit exceeded. Skipping cancellation message, {subnet_key}");
|
||||
tracing::debug!("Rate limit exceeded. Skipping cancellation message");
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.cancellation_requests_total
|
||||
|
||||
@@ -163,36 +163,32 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
|
||||
let do_handshake = handshake(ctx, stream, tls, record_handshake_error);
|
||||
|
||||
let (mut stream, params) = match tokio::time::timeout(config.handshake_timeout, do_handshake)
|
||||
.await??
|
||||
{
|
||||
HandshakeData::Startup(stream, params) => (stream, params),
|
||||
HandshakeData::Cancel(cancel_key_data) => {
|
||||
// spawn a task to cancel the session, but don't wait for it
|
||||
cancellations.spawn({
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let session_id = ctx.session_id();
|
||||
let peer_ip = ctx.peer_addr();
|
||||
let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?session_id);
|
||||
cancel_span.follows_from(tracing::Span::current());
|
||||
async move {
|
||||
drop(
|
||||
cancellation_handler_clone
|
||||
.cancel_session(
|
||||
cancel_key_data,
|
||||
session_id,
|
||||
peer_ip,
|
||||
config.authentication_config.ip_allowlist_check_enabled,
|
||||
)
|
||||
.instrument(cancel_span)
|
||||
.await,
|
||||
);
|
||||
}
|
||||
});
|
||||
let (mut stream, params) =
|
||||
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
|
||||
HandshakeData::Startup(stream, params) => (stream, params),
|
||||
HandshakeData::Cancel(cancel_key_data) => {
|
||||
// spawn a task to cancel the session, but don't wait for it
|
||||
cancellations.spawn({
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let session_id = ctx.session_id();
|
||||
let peer_ip = ctx.peer_addr();
|
||||
async move {
|
||||
drop(
|
||||
cancellation_handler_clone
|
||||
.cancel_session(
|
||||
cancel_key_data,
|
||||
session_id,
|
||||
peer_ip,
|
||||
config.authentication_config.ip_allowlist_check_enabled,
|
||||
)
|
||||
.await,
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
drop(pause);
|
||||
|
||||
ctx.set_db_options(params.clone());
|
||||
|
||||
@@ -238,8 +238,6 @@ pub(crate) struct GetEndpointAccessControl {
|
||||
pub(crate) allowed_ips: Option<Vec<IpPattern>>,
|
||||
pub(crate) project_id: Option<ProjectIdInt>,
|
||||
pub(crate) allowed_vpc_endpoint_ids: Option<Vec<EndpointIdInt>>,
|
||||
pub(crate) block_public_connections: Option<bool>,
|
||||
pub(crate) block_vpc_connections: Option<bool>,
|
||||
}
|
||||
|
||||
// Manually implement debug to omit sensitive info.
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::sync::OnceLock;
|
||||
use lasso::{Capacity, MemoryLimits, Spur, ThreadedRodeo};
|
||||
use rustc_hash::FxHasher;
|
||||
|
||||
use crate::types::{AccountId, BranchId, EndpointId, ProjectId, RoleName};
|
||||
use crate::types::{BranchId, EndpointId, ProjectId, RoleName};
|
||||
|
||||
pub trait InternId: Sized + 'static {
|
||||
fn get_interner() -> &'static StringInterner<Self>;
|
||||
@@ -206,26 +206,6 @@ impl From<ProjectId> for ProjectIdInt {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
|
||||
pub struct AccountIdTag;
|
||||
impl InternId for AccountIdTag {
|
||||
fn get_interner() -> &'static StringInterner<Self> {
|
||||
static ROLE_NAMES: OnceLock<StringInterner<AccountIdTag>> = OnceLock::new();
|
||||
ROLE_NAMES.get_or_init(Default::default)
|
||||
}
|
||||
}
|
||||
pub type AccountIdInt = InternedString<AccountIdTag>;
|
||||
impl From<&AccountId> for AccountIdInt {
|
||||
fn from(value: &AccountId) -> Self {
|
||||
AccountIdTag::get_interner().get_or_intern(value)
|
||||
}
|
||||
}
|
||||
impl From<AccountId> for AccountIdInt {
|
||||
fn from(value: AccountId) -> Self {
|
||||
AccountIdTag::get_interner().get_or_intern(&value)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::OnceLock;
|
||||
|
||||
@@ -556,9 +556,6 @@ pub enum RedisEventsCount {
|
||||
CancelSession,
|
||||
PasswordUpdate,
|
||||
AllowedIpsUpdate,
|
||||
AllowedVpcEndpointIdsUpdateForProjects,
|
||||
AllowedVpcEndpointIdsUpdateForAllProjectsInOrg,
|
||||
BlockPublicOrVpcAccessUpdate,
|
||||
}
|
||||
|
||||
pub struct ThreadPoolWorkers(usize);
|
||||
|
||||
@@ -272,36 +272,32 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
|
||||
let do_handshake = handshake(ctx, stream, mode.handshake_tls(tls), record_handshake_error);
|
||||
|
||||
let (mut stream, params) = match tokio::time::timeout(config.handshake_timeout, do_handshake)
|
||||
.await??
|
||||
{
|
||||
HandshakeData::Startup(stream, params) => (stream, params),
|
||||
HandshakeData::Cancel(cancel_key_data) => {
|
||||
// spawn a task to cancel the session, but don't wait for it
|
||||
cancellations.spawn({
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let session_id = ctx.session_id();
|
||||
let peer_ip = ctx.peer_addr();
|
||||
let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?session_id);
|
||||
cancel_span.follows_from(tracing::Span::current());
|
||||
async move {
|
||||
drop(
|
||||
cancellation_handler_clone
|
||||
.cancel_session(
|
||||
cancel_key_data,
|
||||
session_id,
|
||||
peer_ip,
|
||||
config.authentication_config.ip_allowlist_check_enabled,
|
||||
)
|
||||
.instrument(cancel_span)
|
||||
.await,
|
||||
);
|
||||
}
|
||||
});
|
||||
let (mut stream, params) =
|
||||
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
|
||||
HandshakeData::Startup(stream, params) => (stream, params),
|
||||
HandshakeData::Cancel(cancel_key_data) => {
|
||||
// spawn a task to cancel the session, but don't wait for it
|
||||
cancellations.spawn({
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let session_id = ctx.session_id();
|
||||
let peer_ip = ctx.peer_addr();
|
||||
async move {
|
||||
drop(
|
||||
cancellation_handler_clone
|
||||
.cancel_session(
|
||||
cancel_key_data,
|
||||
session_id,
|
||||
peer_ip,
|
||||
config.authentication_config.ip_allowlist_check_enabled,
|
||||
)
|
||||
.await,
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
drop(pause);
|
||||
|
||||
ctx.set_db_options(params.clone());
|
||||
|
||||
@@ -11,9 +11,8 @@ use uuid::Uuid;
|
||||
use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
|
||||
use crate::cache::project_info::ProjectInfoCache;
|
||||
use crate::cancellation::{CancelMap, CancellationHandler};
|
||||
use crate::intern::{AccountIdInt, ProjectIdInt, RoleNameInt};
|
||||
use crate::intern::{ProjectIdInt, RoleNameInt};
|
||||
use crate::metrics::{Metrics, RedisErrors, RedisEventsCount};
|
||||
use tracing::Instrument;
|
||||
|
||||
const CPLANE_CHANNEL_NAME: &str = "neondb-proxy-ws-updates";
|
||||
pub(crate) const PROXY_CHANNEL_NAME: &str = "neondb-proxy-to-proxy-updates";
|
||||
@@ -39,27 +38,6 @@ pub(crate) enum Notification {
|
||||
AllowedIpsUpdate {
|
||||
allowed_ips_update: AllowedIpsUpdate,
|
||||
},
|
||||
#[serde(
|
||||
rename = "/allowed_vpc_endpoint_ids_updated_for_projects",
|
||||
deserialize_with = "deserialize_json_string"
|
||||
)]
|
||||
AllowedVpcEndpointIdsUpdateForProjects {
|
||||
allowed_vpc_endpoint_ids_update_for_projects: AllowedVpcEndpointIdsUpdateForProjects,
|
||||
},
|
||||
#[serde(
|
||||
rename = "/allowed_vpc_endpoint_ids_updated_for_org",
|
||||
deserialize_with = "deserialize_json_string"
|
||||
)]
|
||||
AllowedVpcEndpointIdsUpdateForAllProjectsInOrg {
|
||||
allowed_vpc_endpoint_ids_update_for_org: AllowedVpcEndpointIdsUpdateForAllProjectsInOrg,
|
||||
},
|
||||
#[serde(
|
||||
rename = "/block_public_or_vpc_access_updated",
|
||||
deserialize_with = "deserialize_json_string"
|
||||
)]
|
||||
BlockPublicOrVpcAccessUpdate {
|
||||
block_public_or_vpc_access_update: BlockPublicOrVpcAccessUpdate,
|
||||
},
|
||||
#[serde(
|
||||
rename = "/password_updated",
|
||||
deserialize_with = "deserialize_json_string"
|
||||
@@ -72,22 +50,6 @@ pub(crate) enum Notification {
|
||||
pub(crate) struct AllowedIpsUpdate {
|
||||
project_id: ProjectIdInt,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub(crate) struct AllowedVpcEndpointIdsUpdateForProjects {
|
||||
project_ids: Vec<ProjectIdInt>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub(crate) struct AllowedVpcEndpointIdsUpdateForAllProjectsInOrg {
|
||||
account_id: AccountIdInt,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub(crate) struct BlockPublicOrVpcAccessUpdate {
|
||||
project_id: ProjectIdInt,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub(crate) struct PasswordUpdate {
|
||||
project_id: ProjectIdInt,
|
||||
@@ -181,8 +143,6 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
let peer_addr = cancel_session
|
||||
.peer_addr
|
||||
.unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED));
|
||||
let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?cancel_session.session_id);
|
||||
cancel_span.follows_from(tracing::Span::current());
|
||||
// This instance of cancellation_handler doesn't have a RedisPublisherClient so it can't publish the message.
|
||||
match self
|
||||
.cancellation_handler
|
||||
@@ -192,7 +152,6 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
peer_addr,
|
||||
cancel_session.peer_addr.is_some(),
|
||||
)
|
||||
.instrument(cancel_span)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {}
|
||||
@@ -201,11 +160,7 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
}
|
||||
}
|
||||
}
|
||||
Notification::AllowedIpsUpdate { .. }
|
||||
| Notification::PasswordUpdate { .. }
|
||||
| Notification::AllowedVpcEndpointIdsUpdateForProjects { .. }
|
||||
| Notification::AllowedVpcEndpointIdsUpdateForAllProjectsInOrg { .. }
|
||||
| Notification::BlockPublicOrVpcAccessUpdate { .. } => {
|
||||
Notification::AllowedIpsUpdate { .. } | Notification::PasswordUpdate { .. } => {
|
||||
invalidate_cache(self.cache.clone(), msg.clone());
|
||||
if matches!(msg, Notification::AllowedIpsUpdate { .. }) {
|
||||
Metrics::get()
|
||||
@@ -217,27 +172,6 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
.proxy
|
||||
.redis_events_count
|
||||
.inc(RedisEventsCount::PasswordUpdate);
|
||||
} else if matches!(
|
||||
msg,
|
||||
Notification::AllowedVpcEndpointIdsUpdateForProjects { .. }
|
||||
) {
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.redis_events_count
|
||||
.inc(RedisEventsCount::AllowedVpcEndpointIdsUpdateForProjects);
|
||||
} else if matches!(
|
||||
msg,
|
||||
Notification::AllowedVpcEndpointIdsUpdateForAllProjectsInOrg { .. }
|
||||
) {
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.redis_events_count
|
||||
.inc(RedisEventsCount::AllowedVpcEndpointIdsUpdateForAllProjectsInOrg);
|
||||
} else if matches!(msg, Notification::BlockPublicOrVpcAccessUpdate { .. }) {
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.redis_events_count
|
||||
.inc(RedisEventsCount::BlockPublicOrVpcAccessUpdate);
|
||||
}
|
||||
// It might happen that the invalid entry is on the way to be cached.
|
||||
// To make sure that the entry is invalidated, let's repeat the invalidation in INVALIDATION_LAG seconds.
|
||||
@@ -259,21 +193,6 @@ fn invalidate_cache<C: ProjectInfoCache>(cache: Arc<C>, msg: Notification) {
|
||||
Notification::AllowedIpsUpdate { allowed_ips_update } => {
|
||||
cache.invalidate_allowed_ips_for_project(allowed_ips_update.project_id);
|
||||
}
|
||||
Notification::AllowedVpcEndpointIdsUpdateForProjects {
|
||||
allowed_vpc_endpoint_ids_update_for_projects,
|
||||
} => cache.invalidate_allowed_vpc_endpoint_ids_for_projects(
|
||||
allowed_vpc_endpoint_ids_update_for_projects.project_ids,
|
||||
),
|
||||
Notification::AllowedVpcEndpointIdsUpdateForAllProjectsInOrg {
|
||||
allowed_vpc_endpoint_ids_update_for_org,
|
||||
} => cache.invalidate_allowed_vpc_endpoint_ids_for_org(
|
||||
allowed_vpc_endpoint_ids_update_for_org.account_id,
|
||||
),
|
||||
Notification::BlockPublicOrVpcAccessUpdate {
|
||||
block_public_or_vpc_access_update,
|
||||
} => cache.invalidate_block_public_or_vpc_access_for_project(
|
||||
block_public_or_vpc_access_update.project_id,
|
||||
),
|
||||
Notification::PasswordUpdate { password_update } => cache
|
||||
.invalidate_role_secret_for_project(
|
||||
password_update.project_id,
|
||||
|
||||
@@ -97,8 +97,6 @@ smol_str_wrapper!(EndpointId);
|
||||
smol_str_wrapper!(BranchId);
|
||||
// 90% of project strings are 23 characters or less.
|
||||
smol_str_wrapper!(ProjectId);
|
||||
// 90% of account strings are 23 characters or less.
|
||||
smol_str_wrapper!(AccountId);
|
||||
|
||||
// will usually equal endpoint ID
|
||||
smol_str_wrapper!(EndpointCacheKey);
|
||||
|
||||
@@ -83,20 +83,14 @@ impl Env {
|
||||
node_id: NodeId,
|
||||
ttid: TenantTimelineId,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let conf = Arc::new(self.make_conf(node_id));
|
||||
let conf = self.make_conf(node_id);
|
||||
let timeline_dir = get_timeline_dir(&conf, &ttid);
|
||||
let remote_path = remote_timeline_path(&ttid)?;
|
||||
|
||||
let safekeeper = self.make_safekeeper(node_id, ttid).await?;
|
||||
let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
|
||||
|
||||
let timeline = Timeline::new(
|
||||
ttid,
|
||||
&timeline_dir,
|
||||
&remote_path,
|
||||
shared_state,
|
||||
conf.clone(),
|
||||
);
|
||||
let timeline = Timeline::new(ttid, &timeline_dir, &remote_path, shared_state);
|
||||
timeline.bootstrap(
|
||||
&mut timeline.write_shared_state().await,
|
||||
&conf,
|
||||
|
||||
@@ -338,7 +338,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
};
|
||||
|
||||
let conf = Arc::new(SafeKeeperConf {
|
||||
let conf = SafeKeeperConf {
|
||||
workdir,
|
||||
my_id: id,
|
||||
listen_pg_addr: args.listen_pg,
|
||||
@@ -368,7 +368,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
control_file_save_interval: args.control_file_save_interval,
|
||||
partial_backup_concurrency: args.partial_backup_concurrency,
|
||||
eviction_min_resident: args.eviction_min_resident,
|
||||
});
|
||||
};
|
||||
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
let _sentry_guard = init_sentry(
|
||||
@@ -382,7 +382,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
/// complete, e.g. panicked, inner is error produced by task itself.
|
||||
type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
|
||||
|
||||
async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
|
||||
async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
// fsync the datadir to make sure we have a consistent state on disk.
|
||||
if !conf.no_sync {
|
||||
let dfd = File::open(&conf.workdir).context("open datadir for syncfs")?;
|
||||
@@ -428,11 +428,9 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
|
||||
e
|
||||
})?;
|
||||
|
||||
let global_timelines = Arc::new(GlobalTimelines::new(conf.clone()));
|
||||
|
||||
// Register metrics collector for active timelines. It's important to do this
|
||||
// after daemonizing, otherwise process collector will be upset.
|
||||
let timeline_collector = safekeeper::metrics::TimelineCollector::new(global_timelines.clone());
|
||||
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
|
||||
metrics::register_internal(Box::new(timeline_collector))?;
|
||||
|
||||
wal_backup::init_remote_storage(&conf).await;
|
||||
@@ -449,8 +447,9 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
|
||||
.then(|| Handle::try_current().expect("no runtime in main"));
|
||||
|
||||
// Load all timelines from disk to memory.
|
||||
global_timelines.init().await?;
|
||||
GlobalTimelines::init(conf.clone()).await?;
|
||||
|
||||
let conf_ = conf.clone();
|
||||
// Run everything in current thread rt, if asked.
|
||||
if conf.current_thread_runtime {
|
||||
info!("running in current thread runtime");
|
||||
@@ -460,16 +459,14 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
|
||||
.spawn(wal_service::task_main(
|
||||
conf.clone(),
|
||||
conf_,
|
||||
pg_listener,
|
||||
Scope::SafekeeperData,
|
||||
global_timelines.clone(),
|
||||
))
|
||||
// wrap with task name for error reporting
|
||||
.map(|res| ("WAL service main".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(wal_service_handle));
|
||||
|
||||
let global_timelines_ = global_timelines.clone();
|
||||
let timeline_housekeeping_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
|
||||
@@ -477,45 +474,40 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
|
||||
const TOMBSTONE_TTL: Duration = Duration::from_secs(3600 * 24);
|
||||
loop {
|
||||
tokio::time::sleep(TOMBSTONE_TTL).await;
|
||||
global_timelines_.housekeeping(&TOMBSTONE_TTL);
|
||||
GlobalTimelines::housekeeping(&TOMBSTONE_TTL);
|
||||
}
|
||||
})
|
||||
.map(|res| ("Timeline map housekeeping".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(timeline_housekeeping_handle));
|
||||
|
||||
if let Some(pg_listener_tenant_only) = pg_listener_tenant_only {
|
||||
let conf_ = conf.clone();
|
||||
let wal_service_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
|
||||
.spawn(wal_service::task_main(
|
||||
conf.clone(),
|
||||
conf_,
|
||||
pg_listener_tenant_only,
|
||||
Scope::Tenant,
|
||||
global_timelines.clone(),
|
||||
))
|
||||
// wrap with task name for error reporting
|
||||
.map(|res| ("WAL service tenant only main".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(wal_service_handle));
|
||||
}
|
||||
|
||||
let conf_ = conf.clone();
|
||||
let http_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| HTTP_RUNTIME.handle())
|
||||
.spawn(http::task_main(
|
||||
conf.clone(),
|
||||
http_listener,
|
||||
global_timelines.clone(),
|
||||
))
|
||||
.spawn(http::task_main(conf_, http_listener))
|
||||
.map(|res| ("HTTP service main".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(http_handle));
|
||||
|
||||
let conf_ = conf.clone();
|
||||
let broker_task_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| BROKER_RUNTIME.handle())
|
||||
.spawn(
|
||||
broker::task_main(conf.clone(), global_timelines.clone())
|
||||
.instrument(info_span!("broker")),
|
||||
)
|
||||
.spawn(broker::task_main(conf_).instrument(info_span!("broker")))
|
||||
.map(|res| ("broker main".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(broker_task_handle));
|
||||
|
||||
|
||||
@@ -39,17 +39,14 @@ const RETRY_INTERVAL_MSEC: u64 = 1000;
|
||||
const PUSH_INTERVAL_MSEC: u64 = 1000;
|
||||
|
||||
/// Push once in a while data about all active timelines to the broker.
|
||||
async fn push_loop(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> anyhow::Result<()> {
|
||||
async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
if conf.disable_periodic_broker_push {
|
||||
info!("broker push_loop is disabled, doing nothing...");
|
||||
futures::future::pending::<()>().await; // sleep forever
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let active_timelines_set = global_timelines.get_global_broker_active_set();
|
||||
let active_timelines_set = GlobalTimelines::get_global_broker_active_set();
|
||||
|
||||
let mut client =
|
||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
||||
@@ -90,13 +87,8 @@ async fn push_loop(
|
||||
|
||||
/// Subscribe and fetch all the interesting data from the broker.
|
||||
#[instrument(name = "broker_pull", skip_all)]
|
||||
async fn pull_loop(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
stats: Arc<BrokerStats>,
|
||||
) -> Result<()> {
|
||||
let mut client =
|
||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
||||
async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
|
||||
let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;
|
||||
|
||||
// TODO: subscribe only to local timelines instead of all
|
||||
let request = SubscribeSafekeeperInfoRequest {
|
||||
@@ -121,7 +113,7 @@ async fn pull_loop(
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
|
||||
let ttid = parse_proto_ttid(proto_ttid)?;
|
||||
if let Ok(tli) = global_timelines.get(ttid) {
|
||||
if let Ok(tli) = GlobalTimelines::get(ttid) {
|
||||
// Note that we also receive *our own* info. That's
|
||||
// important, as it is used as an indication of live
|
||||
// connection to the broker.
|
||||
@@ -143,11 +135,7 @@ async fn pull_loop(
|
||||
|
||||
/// Process incoming discover requests. This is done in a separate task to avoid
|
||||
/// interfering with the normal pull/push loops.
|
||||
async fn discover_loop(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
stats: Arc<BrokerStats>,
|
||||
) -> Result<()> {
|
||||
async fn discover_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
|
||||
let mut client =
|
||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
||||
|
||||
@@ -183,7 +171,7 @@ async fn discover_loop(
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
|
||||
let ttid = parse_proto_ttid(proto_ttid)?;
|
||||
if let Ok(tli) = global_timelines.get(ttid) {
|
||||
if let Ok(tli) = GlobalTimelines::get(ttid) {
|
||||
// we received a discovery request for a timeline we know about
|
||||
discover_counter.inc();
|
||||
|
||||
@@ -222,10 +210,7 @@ async fn discover_loop(
|
||||
bail!("end of stream");
|
||||
}
|
||||
|
||||
pub async fn task_main(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> anyhow::Result<()> {
|
||||
pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
info!("started, broker endpoint {:?}", conf.broker_endpoint);
|
||||
|
||||
let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
|
||||
@@ -276,13 +261,13 @@ pub async fn task_main(
|
||||
},
|
||||
_ = ticker.tick() => {
|
||||
if push_handle.is_none() {
|
||||
push_handle = Some(tokio::spawn(push_loop(conf.clone(), global_timelines.clone())));
|
||||
push_handle = Some(tokio::spawn(push_loop(conf.clone())));
|
||||
}
|
||||
if pull_handle.is_none() {
|
||||
pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), global_timelines.clone(), stats.clone())));
|
||||
pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), stats.clone())));
|
||||
}
|
||||
if discover_handle.is_none() {
|
||||
discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), global_timelines.clone(), stats.clone())));
|
||||
discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), stats.clone())));
|
||||
}
|
||||
},
|
||||
_ = &mut stats_task => {}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{bail, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
|
||||
use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
|
||||
use std::sync::Arc;
|
||||
use tokio::{
|
||||
fs::OpenOptions,
|
||||
io::{AsyncSeekExt, AsyncWriteExt},
|
||||
@@ -12,7 +14,7 @@ use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
use crate::{
|
||||
control_file::FileStorage,
|
||||
state::TimelinePersistentState,
|
||||
timeline::{TimelineError, WalResidentTimeline},
|
||||
timeline::{Timeline, TimelineError, WalResidentTimeline},
|
||||
timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},
|
||||
wal_backup::copy_s3_segments,
|
||||
wal_storage::{wal_file_paths, WalReader},
|
||||
@@ -23,19 +25,16 @@ use crate::{
|
||||
const MAX_BACKUP_LAG: u64 = 10 * WAL_SEGMENT_SIZE as u64;
|
||||
|
||||
pub struct Request {
|
||||
pub source_ttid: TenantTimelineId,
|
||||
pub source: Arc<Timeline>,
|
||||
pub until_lsn: Lsn,
|
||||
pub destination_ttid: TenantTimelineId,
|
||||
}
|
||||
|
||||
pub async fn handle_request(
|
||||
request: Request,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> Result<()> {
|
||||
pub async fn handle_request(request: Request) -> Result<()> {
|
||||
// TODO: request.until_lsn MUST be a valid LSN, and we cannot check it :(
|
||||
// if LSN will point to the middle of a WAL record, timeline will be in "broken" state
|
||||
|
||||
match global_timelines.get(request.destination_ttid) {
|
||||
match GlobalTimelines::get(request.destination_ttid) {
|
||||
// timeline already exists. would be good to check that this timeline is the copy
|
||||
// of the source timeline, but it isn't obvious how to do that
|
||||
Ok(_) => return Ok(()),
|
||||
@@ -47,10 +46,9 @@ pub async fn handle_request(
|
||||
}
|
||||
}
|
||||
|
||||
let source = global_timelines.get(request.source_ttid)?;
|
||||
let source_tli = source.wal_residence_guard().await?;
|
||||
let source_tli = request.source.wal_residence_guard().await?;
|
||||
|
||||
let conf = &global_timelines.get_global_config();
|
||||
let conf = &GlobalTimelines::get_global_config();
|
||||
let ttid = request.destination_ttid;
|
||||
|
||||
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
|
||||
@@ -129,7 +127,7 @@ pub async fn handle_request(
|
||||
|
||||
copy_s3_segments(
|
||||
wal_seg_size,
|
||||
&request.source_ttid,
|
||||
&request.source.ttid,
|
||||
&request.destination_ttid,
|
||||
first_segment,
|
||||
first_ondisk_segment,
|
||||
@@ -160,9 +158,7 @@ pub async fn handle_request(
|
||||
|
||||
// now we have a ready timeline in a temp directory
|
||||
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
|
||||
global_timelines
|
||||
.load_temp_timeline(request.destination_ttid, &tli_dir_path, true)
|
||||
.await?;
|
||||
GlobalTimelines::load_temp_timeline(request.destination_ttid, &tli_dir_path, true).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -207,23 +207,23 @@ pub struct FileInfo {
|
||||
}
|
||||
|
||||
/// Build debug dump response, using the provided [`Args`] filters.
|
||||
pub async fn build(args: Args, global_timelines: Arc<GlobalTimelines>) -> Result<Response> {
|
||||
pub async fn build(args: Args) -> Result<Response> {
|
||||
let start_time = Utc::now();
|
||||
let timelines_count = global_timelines.timelines_count();
|
||||
let config = global_timelines.get_global_config();
|
||||
let timelines_count = GlobalTimelines::timelines_count();
|
||||
let config = GlobalTimelines::get_global_config();
|
||||
|
||||
let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() {
|
||||
// If both tenant_id and timeline_id are specified, we can just get the
|
||||
// timeline directly, without taking a snapshot of the whole list.
|
||||
let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap());
|
||||
if let Ok(tli) = global_timelines.get(ttid) {
|
||||
if let Ok(tli) = GlobalTimelines::get(ttid) {
|
||||
vec![tli]
|
||||
} else {
|
||||
vec![]
|
||||
}
|
||||
} else {
|
||||
// Otherwise, take a snapshot of the whole list.
|
||||
global_timelines.get_all()
|
||||
GlobalTimelines::get_all()
|
||||
};
|
||||
|
||||
let mut timelines = Vec::new();
|
||||
@@ -344,12 +344,12 @@ fn get_wal_last_modified(path: &Utf8Path) -> Result<Option<DateTime<Utc>>> {
|
||||
|
||||
/// Converts SafeKeeperConf to Config, filtering out the fields that are not
|
||||
/// supposed to be exposed.
|
||||
fn build_config(config: Arc<SafeKeeperConf>) -> Config {
|
||||
fn build_config(config: SafeKeeperConf) -> Config {
|
||||
Config {
|
||||
id: config.my_id,
|
||||
workdir: config.workdir.clone().into(),
|
||||
listen_pg_addr: config.listen_pg_addr.clone(),
|
||||
listen_http_addr: config.listen_http_addr.clone(),
|
||||
workdir: config.workdir.into(),
|
||||
listen_pg_addr: config.listen_pg_addr,
|
||||
listen_http_addr: config.listen_http_addr,
|
||||
no_sync: config.no_sync,
|
||||
max_offloader_lag_bytes: config.max_offloader_lag_bytes,
|
||||
wal_backup_enabled: config.wal_backup_enabled,
|
||||
|
||||
@@ -33,7 +33,7 @@ use utils::{
|
||||
|
||||
/// Safekeeper handler of postgres commands
|
||||
pub struct SafekeeperPostgresHandler {
|
||||
pub conf: Arc<SafeKeeperConf>,
|
||||
pub conf: SafeKeeperConf,
|
||||
/// assigned application name
|
||||
pub appname: Option<String>,
|
||||
pub tenant_id: Option<TenantId>,
|
||||
@@ -43,7 +43,6 @@ pub struct SafekeeperPostgresHandler {
|
||||
pub protocol: Option<PostgresClientProtocol>,
|
||||
/// Unique connection id is logged in spans for observability.
|
||||
pub conn_id: ConnectionId,
|
||||
pub global_timelines: Arc<GlobalTimelines>,
|
||||
/// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured.
|
||||
auth: Option<(Scope, Arc<JwtAuth>)>,
|
||||
claims: Option<Claims>,
|
||||
@@ -315,11 +314,10 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
|
||||
impl SafekeeperPostgresHandler {
|
||||
pub fn new(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
conf: SafeKeeperConf,
|
||||
conn_id: u32,
|
||||
io_metrics: Option<TrafficMetrics>,
|
||||
auth: Option<(Scope, Arc<JwtAuth>)>,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> Self {
|
||||
SafekeeperPostgresHandler {
|
||||
conf,
|
||||
@@ -333,7 +331,6 @@ impl SafekeeperPostgresHandler {
|
||||
claims: None,
|
||||
auth,
|
||||
io_metrics,
|
||||
global_timelines,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -363,7 +360,7 @@ impl SafekeeperPostgresHandler {
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
) -> Result<(), QueryError> {
|
||||
// Get timeline, handling "not found" error
|
||||
let tli = match self.global_timelines.get(self.ttid) {
|
||||
let tli = match GlobalTimelines::get(self.ttid) {
|
||||
Ok(tli) => Ok(Some(tli)),
|
||||
Err(TimelineError::NotFound(_)) => Ok(None),
|
||||
Err(e) => Err(QueryError::Other(e.into())),
|
||||
@@ -397,10 +394,7 @@ impl SafekeeperPostgresHandler {
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
) -> Result<(), QueryError> {
|
||||
let tli = self
|
||||
.global_timelines
|
||||
.get(self.ttid)
|
||||
.map_err(|e| QueryError::Other(e.into()))?;
|
||||
let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?;
|
||||
|
||||
let lsn = if self.is_walproposer_recovery() {
|
||||
// walproposer should get all local WAL until flush_lsn
|
||||
|
||||
@@ -3,16 +3,14 @@ pub mod routes;
|
||||
pub use routes::make_router;
|
||||
|
||||
pub use safekeeper_api::models;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
use crate::SafeKeeperConf;
|
||||
|
||||
pub async fn task_main(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
conf: SafeKeeperConf,
|
||||
http_listener: std::net::TcpListener,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> anyhow::Result<()> {
|
||||
let router = make_router(conf, global_timelines)
|
||||
let router = make_router(conf)
|
||||
.build()
|
||||
.map_err(|err| anyhow::anyhow!(err))?;
|
||||
let service = utils::http::RouterService::new(router).unwrap();
|
||||
|
||||
@@ -66,13 +66,6 @@ fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
|
||||
.as_ref()
|
||||
}
|
||||
|
||||
fn get_global_timelines(request: &Request<Body>) -> Arc<GlobalTimelines> {
|
||||
request
|
||||
.data::<Arc<GlobalTimelines>>()
|
||||
.expect("unknown state type")
|
||||
.clone()
|
||||
}
|
||||
|
||||
/// Same as TermLsn, but serializes LSN using display serializer
|
||||
/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
@@ -130,11 +123,9 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
ensure_no_body(&mut request).await?;
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
// FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
|
||||
// Using an `InternalServerError` should be fixed when the types support it
|
||||
let delete_info = global_timelines
|
||||
.delete_force_all_for_tenant(&tenant_id, only_local)
|
||||
let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id, only_local)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(
|
||||
@@ -165,9 +156,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
.commit_lsn
|
||||
.segment_lsn(server_info.wal_seg_size as usize)
|
||||
});
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
global_timelines
|
||||
.create(ttid, server_info, request_data.commit_lsn, local_start_lsn)
|
||||
GlobalTimelines::create(ttid, server_info, request_data.commit_lsn, local_start_lsn)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
@@ -178,9 +167,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
/// Note: it is possible to do the same with debug_dump.
|
||||
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
let res: Vec<TenantTimelineId> = global_timelines
|
||||
.get_all()
|
||||
let res: Vec<TenantTimelineId> = GlobalTimelines::get_all()
|
||||
.iter()
|
||||
.map(|tli| tli.ttid)
|
||||
.collect();
|
||||
@@ -195,8 +182,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
);
|
||||
check_permission(&request, Some(ttid.tenant_id))?;
|
||||
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
let (inmem, state) = tli.get_state().await;
|
||||
let flush_lsn = tli.get_flush_lsn().await;
|
||||
|
||||
@@ -247,11 +233,9 @@ async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<
|
||||
let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
|
||||
check_permission(&request, Some(ttid.tenant_id))?;
|
||||
ensure_no_body(&mut request).await?;
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
// FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
|
||||
// error handling here when we're able to.
|
||||
let resp = global_timelines
|
||||
.delete(&ttid, only_local)
|
||||
let resp = GlobalTimelines::delete(&ttid, only_local)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, resp)
|
||||
@@ -263,9 +247,8 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
|
||||
let data: pull_timeline::Request = json_request(&mut request).await?;
|
||||
let conf = get_conf(&request);
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
|
||||
let resp = pull_timeline::handle_request(data, conf.sk_auth_token.clone(), global_timelines)
|
||||
let resp = pull_timeline::handle_request(data, conf.sk_auth_token.clone())
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, resp)
|
||||
@@ -280,8 +263,7 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
|
||||
);
|
||||
check_permission(&request, Some(ttid.tenant_id))?;
|
||||
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
|
||||
// To stream the body use wrap_stream which wants Stream of Result<Bytes>,
|
||||
// so create the chan and write to it in another task.
|
||||
@@ -311,19 +293,19 @@ async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
check_permission(&request, None)?;
|
||||
|
||||
let request_data: TimelineCopyRequest = json_request(&mut request).await?;
|
||||
let source_ttid = TenantTimelineId::new(
|
||||
let ttid = TenantTimelineId::new(
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "source_timeline_id")?,
|
||||
);
|
||||
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
let source = GlobalTimelines::get(ttid)?;
|
||||
|
||||
copy_timeline::handle_request(copy_timeline::Request{
|
||||
source_ttid,
|
||||
source,
|
||||
until_lsn: request_data.until_lsn,
|
||||
destination_ttid: TenantTimelineId::new(source_ttid.tenant_id, request_data.target_timeline_id),
|
||||
}, global_timelines)
|
||||
.instrument(info_span!("copy_timeline", from=%source_ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn))
|
||||
destination_ttid: TenantTimelineId::new(ttid.tenant_id, request_data.target_timeline_id),
|
||||
})
|
||||
.instrument(info_span!("copy_timeline", from=%ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn))
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
@@ -340,8 +322,7 @@ async fn patch_control_file_handler(
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
|
||||
let patch_request: patch_control_file::Request = json_request(&mut request).await?;
|
||||
let response = patch_control_file::handle_request(tli, patch_request)
|
||||
@@ -360,8 +341,7 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
let tli = global_timelines.get(ttid)?;
|
||||
let tli = GlobalTimelines::get(ttid)?;
|
||||
tli.write_shared_state()
|
||||
.await
|
||||
.sk
|
||||
@@ -379,7 +359,6 @@ async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body
|
||||
);
|
||||
check_permission(&request, Some(ttid.tenant_id))?;
|
||||
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
let from_lsn: Option<Lsn> = parse_query_param(&request, "from_lsn")?;
|
||||
let until_lsn: Option<Lsn> = parse_query_param(&request, "until_lsn")?;
|
||||
|
||||
@@ -392,7 +371,7 @@ async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body
|
||||
)))?,
|
||||
};
|
||||
|
||||
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
let tli = tli
|
||||
.wal_residence_guard()
|
||||
.await
|
||||
@@ -414,8 +393,7 @@ async fn timeline_backup_partial_reset(request: Request<Body>) -> Result<Respons
|
||||
);
|
||||
check_permission(&request, Some(ttid.tenant_id))?;
|
||||
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
|
||||
let response = tli
|
||||
.backup_partial_reset()
|
||||
@@ -437,8 +415,7 @@ async fn timeline_term_bump_handler(
|
||||
|
||||
let request_data: TimelineTermBumpRequest = json_request(&mut request).await?;
|
||||
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
let response = tli
|
||||
.term_bump(request_data.term)
|
||||
.await
|
||||
@@ -475,8 +452,7 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
|
||||
standby_horizon: sk_info.standby_horizon.0,
|
||||
};
|
||||
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
tli.record_safekeeper_info(proto_sk_info)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
@@ -530,8 +506,6 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
|
||||
let dump_term_history = dump_term_history.unwrap_or(true);
|
||||
let dump_wal_last_modified = dump_wal_last_modified.unwrap_or(dump_all);
|
||||
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
|
||||
let args = debug_dump::Args {
|
||||
dump_all,
|
||||
dump_control_file,
|
||||
@@ -543,7 +517,7 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
|
||||
timeline_id,
|
||||
};
|
||||
|
||||
let resp = debug_dump::build(args, global_timelines)
|
||||
let resp = debug_dump::build(args)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
@@ -596,10 +570,7 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
|
||||
}
|
||||
|
||||
/// Safekeeper http router.
|
||||
pub fn make_router(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> RouterBuilder<hyper::Body, ApiError> {
|
||||
pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
|
||||
let mut router = endpoint::make_router();
|
||||
if conf.http_auth.is_some() {
|
||||
router = router.middleware(auth_middleware(|request| {
|
||||
@@ -621,8 +592,7 @@ pub fn make_router(
|
||||
// located nearby (/safekeeper/src/http/openapi_spec.yaml).
|
||||
let auth = conf.http_auth.clone();
|
||||
router
|
||||
.data(conf)
|
||||
.data(global_timelines)
|
||||
.data(Arc::new(conf))
|
||||
.data(auth)
|
||||
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
|
||||
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
|
||||
|
||||
@@ -11,6 +11,7 @@ use postgres_backend::QueryError;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::*;
|
||||
use utils::id::TenantTimelineId;
|
||||
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
|
||||
@@ -20,6 +21,7 @@ use crate::safekeeper::{
|
||||
use crate::safekeeper::{Term, TermHistory, TermLsn};
|
||||
use crate::state::TimelinePersistentState;
|
||||
use crate::timeline::WalResidentTimeline;
|
||||
use crate::GlobalTimelines;
|
||||
use postgres_backend::PostgresBackend;
|
||||
use postgres_ffi::encode_logical_message;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
@@ -68,7 +70,7 @@ pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
info!("JSON_CTRL request: {append_request:?}");
|
||||
|
||||
// need to init safekeeper state before AppendRequest
|
||||
let tli = prepare_safekeeper(spg, append_request.pg_version).await?;
|
||||
let tli = prepare_safekeeper(spg.ttid, append_request.pg_version).await?;
|
||||
|
||||
// if send_proposer_elected is true, we need to update local history
|
||||
if append_request.send_proposer_elected {
|
||||
@@ -97,22 +99,20 @@ pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
/// Prepare safekeeper to process append requests without crashes,
|
||||
/// by sending ProposerGreeting with default server.wal_seg_size.
|
||||
async fn prepare_safekeeper(
|
||||
spg: &SafekeeperPostgresHandler,
|
||||
ttid: TenantTimelineId,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<WalResidentTimeline> {
|
||||
let tli = spg
|
||||
.global_timelines
|
||||
.create(
|
||||
spg.ttid,
|
||||
ServerInfo {
|
||||
pg_version,
|
||||
wal_seg_size: WAL_SEGMENT_SIZE as u32,
|
||||
system_id: 0,
|
||||
},
|
||||
Lsn::INVALID,
|
||||
Lsn::INVALID,
|
||||
)
|
||||
.await?;
|
||||
let tli = GlobalTimelines::create(
|
||||
ttid,
|
||||
ServerInfo {
|
||||
pg_version,
|
||||
wal_seg_size: WAL_SEGMENT_SIZE as u32,
|
||||
system_id: 0,
|
||||
},
|
||||
Lsn::INVALID,
|
||||
Lsn::INVALID,
|
||||
)
|
||||
.await?;
|
||||
|
||||
tli.wal_residence_guard().await
|
||||
}
|
||||
|
||||
@@ -455,7 +455,6 @@ pub struct FullTimelineInfo {
|
||||
|
||||
/// Collects metrics for all active timelines.
|
||||
pub struct TimelineCollector {
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
descs: Vec<Desc>,
|
||||
commit_lsn: GenericGaugeVec<AtomicU64>,
|
||||
backup_lsn: GenericGaugeVec<AtomicU64>,
|
||||
@@ -479,8 +478,14 @@ pub struct TimelineCollector {
|
||||
active_timelines_count: IntGauge,
|
||||
}
|
||||
|
||||
impl Default for TimelineCollector {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl TimelineCollector {
|
||||
pub fn new(global_timelines: Arc<GlobalTimelines>) -> TimelineCollector {
|
||||
pub fn new() -> TimelineCollector {
|
||||
let mut descs = Vec::new();
|
||||
|
||||
let commit_lsn = GenericGaugeVec::new(
|
||||
@@ -671,7 +676,6 @@ impl TimelineCollector {
|
||||
descs.extend(active_timelines_count.desc().into_iter().cloned());
|
||||
|
||||
TimelineCollector {
|
||||
global_timelines,
|
||||
descs,
|
||||
commit_lsn,
|
||||
backup_lsn,
|
||||
@@ -724,18 +728,17 @@ impl Collector for TimelineCollector {
|
||||
self.written_wal_seconds.reset();
|
||||
self.flushed_wal_seconds.reset();
|
||||
|
||||
let timelines_count = self.global_timelines.get_all().len();
|
||||
let timelines_count = GlobalTimelines::get_all().len();
|
||||
let mut active_timelines_count = 0;
|
||||
|
||||
// Prometheus Collector is sync, and data is stored under async lock. To
|
||||
// bridge the gap with a crutch, collect data in spawned thread with
|
||||
// local tokio runtime.
|
||||
let global_timelines = self.global_timelines.clone();
|
||||
let infos = std::thread::spawn(|| {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.build()
|
||||
.expect("failed to create rt");
|
||||
rt.block_on(collect_timeline_metrics(global_timelines))
|
||||
rt.block_on(collect_timeline_metrics())
|
||||
})
|
||||
.join()
|
||||
.expect("collect_timeline_metrics thread panicked");
|
||||
@@ -854,9 +857,9 @@ impl Collector for TimelineCollector {
|
||||
}
|
||||
}
|
||||
|
||||
async fn collect_timeline_metrics(global_timelines: Arc<GlobalTimelines>) -> Vec<FullTimelineInfo> {
|
||||
async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
|
||||
let mut res = vec![];
|
||||
let active_timelines = global_timelines.get_global_broker_active_set().get_all();
|
||||
let active_timelines = GlobalTimelines::get_global_broker_active_set().get_all();
|
||||
|
||||
for tli in active_timelines {
|
||||
if let Some(info) = tli.info_for_metrics().await {
|
||||
|
||||
@@ -409,9 +409,8 @@ pub struct DebugDumpResponse {
|
||||
pub async fn handle_request(
|
||||
request: Request,
|
||||
sk_auth_token: Option<SecretString>,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> Result<Response> {
|
||||
let existing_tli = global_timelines.get(TenantTimelineId::new(
|
||||
let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
|
||||
request.tenant_id,
|
||||
request.timeline_id,
|
||||
));
|
||||
@@ -454,14 +453,13 @@ pub async fn handle_request(
|
||||
assert!(status.tenant_id == request.tenant_id);
|
||||
assert!(status.timeline_id == request.timeline_id);
|
||||
|
||||
pull_timeline(status, safekeeper_host, sk_auth_token, global_timelines).await
|
||||
pull_timeline(status, safekeeper_host, sk_auth_token).await
|
||||
}
|
||||
|
||||
async fn pull_timeline(
|
||||
status: TimelineStatus,
|
||||
host: String,
|
||||
sk_auth_token: Option<SecretString>,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> Result<Response> {
|
||||
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
|
||||
info!(
|
||||
@@ -474,7 +472,7 @@ async fn pull_timeline(
|
||||
status.acceptor_state.epoch
|
||||
);
|
||||
|
||||
let conf = &global_timelines.get_global_config();
|
||||
let conf = &GlobalTimelines::get_global_config();
|
||||
|
||||
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
|
||||
|
||||
@@ -533,9 +531,7 @@ async fn pull_timeline(
|
||||
assert!(status.commit_lsn <= status.flush_lsn);
|
||||
|
||||
// Finally, load the timeline.
|
||||
let _tli = global_timelines
|
||||
.load_temp_timeline(ttid, &tli_dir_path, false)
|
||||
.await?;
|
||||
let _tli = GlobalTimelines::load_temp_timeline(ttid, &tli_dir_path, false).await?;
|
||||
|
||||
Ok(Response {
|
||||
safekeeper_host: host,
|
||||
|
||||
@@ -267,7 +267,6 @@ impl SafekeeperPostgresHandler {
|
||||
pgb_reader: &mut pgb_reader,
|
||||
peer_addr,
|
||||
acceptor_handle: &mut acceptor_handle,
|
||||
global_timelines: self.global_timelines.clone(),
|
||||
};
|
||||
|
||||
// Read first message and create timeline if needed.
|
||||
@@ -332,7 +331,6 @@ struct NetworkReader<'a, IO> {
|
||||
// WalAcceptor is spawned when we learn server info from walproposer and
|
||||
// create timeline; handle is put here.
|
||||
acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
}
|
||||
|
||||
impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
|
||||
@@ -352,11 +350,10 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
|
||||
system_id: greeting.system_id,
|
||||
wal_seg_size: greeting.wal_seg_size,
|
||||
};
|
||||
let tli = self
|
||||
.global_timelines
|
||||
.create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID)
|
||||
.await
|
||||
.context("create timeline")?;
|
||||
let tli =
|
||||
GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID)
|
||||
.await
|
||||
.context("create timeline")?;
|
||||
tli.wal_residence_guard().await?
|
||||
}
|
||||
_ => {
|
||||
|
||||
@@ -10,6 +10,7 @@ use crate::timeline::WalResidentTimeline;
|
||||
use crate::wal_reader_stream::WalReaderStreamBuilder;
|
||||
use crate::wal_service::ConnectionId;
|
||||
use crate::wal_storage::WalReader;
|
||||
use crate::GlobalTimelines;
|
||||
use anyhow::{bail, Context as AnyhowContext};
|
||||
use bytes::Bytes;
|
||||
use futures::future::Either;
|
||||
@@ -399,10 +400,7 @@ impl SafekeeperPostgresHandler {
|
||||
start_pos: Lsn,
|
||||
term: Option<Term>,
|
||||
) -> Result<(), QueryError> {
|
||||
let tli = self
|
||||
.global_timelines
|
||||
.get(self.ttid)
|
||||
.map_err(|e| QueryError::Other(e.into()))?;
|
||||
let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?;
|
||||
let residence_guard = tli.wal_residence_guard().await?;
|
||||
|
||||
if let Err(end) = self
|
||||
|
||||
@@ -44,8 +44,8 @@ use crate::wal_backup_partial::PartialRemoteSegment;
|
||||
|
||||
use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
|
||||
use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
|
||||
use crate::SafeKeeperConf;
|
||||
use crate::{debug_dump, timeline_manager, wal_storage};
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
|
||||
/// Things safekeeper should know about timeline state on peers.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
@@ -467,7 +467,6 @@ pub struct Timeline {
|
||||
walreceivers: Arc<WalReceivers>,
|
||||
timeline_dir: Utf8PathBuf,
|
||||
manager_ctl: ManagerCtl,
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
|
||||
/// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding
|
||||
/// this gate, you must respect [`Timeline::cancel`]
|
||||
@@ -490,7 +489,6 @@ impl Timeline {
|
||||
timeline_dir: &Utf8Path,
|
||||
remote_path: &RemotePath,
|
||||
shared_state: SharedState,
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
) -> Arc<Self> {
|
||||
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
|
||||
watch::channel(shared_state.sk.state().commit_lsn);
|
||||
@@ -518,7 +516,6 @@ impl Timeline {
|
||||
gate: Default::default(),
|
||||
cancel: CancellationToken::default(),
|
||||
manager_ctl: ManagerCtl::new(),
|
||||
conf,
|
||||
broker_active: AtomicBool::new(false),
|
||||
wal_backup_active: AtomicBool::new(false),
|
||||
last_removed_segno: AtomicU64::new(0),
|
||||
@@ -527,14 +524,11 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Load existing timeline from disk.
|
||||
pub fn load_timeline(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
ttid: TenantTimelineId,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
|
||||
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
|
||||
|
||||
let shared_state = SharedState::restore(conf.as_ref(), &ttid)?;
|
||||
let timeline_dir = get_timeline_dir(conf.as_ref(), &ttid);
|
||||
let shared_state = SharedState::restore(conf, &ttid)?;
|
||||
let timeline_dir = get_timeline_dir(conf, &ttid);
|
||||
let remote_path = remote_timeline_path(&ttid)?;
|
||||
|
||||
Ok(Timeline::new(
|
||||
@@ -542,7 +536,6 @@ impl Timeline {
|
||||
&timeline_dir,
|
||||
&remote_path,
|
||||
shared_state,
|
||||
conf,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -611,7 +604,8 @@ impl Timeline {
|
||||
// it is cancelled, so WAL storage won't be opened again.
|
||||
shared_state.sk.close_wal_store();
|
||||
|
||||
if !only_local && self.conf.is_wal_backup_enabled() {
|
||||
let conf = GlobalTimelines::get_global_config();
|
||||
if !only_local && conf.is_wal_backup_enabled() {
|
||||
// Note: we concurrently delete remote storage data from multiple
|
||||
// safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
|
||||
// do some retries anyway.
|
||||
@@ -957,7 +951,7 @@ impl WalResidentTimeline {
|
||||
|
||||
pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> {
|
||||
let (_, persisted_state) = self.get_state().await;
|
||||
let enable_remote_read = self.conf.is_wal_backup_enabled();
|
||||
let enable_remote_read = GlobalTimelines::get_global_config().is_wal_backup_enabled();
|
||||
|
||||
WalReader::new(
|
||||
&self.ttid,
|
||||
@@ -1067,6 +1061,7 @@ impl ManagerTimeline {
|
||||
|
||||
/// Try to switch state Offloaded->Present.
|
||||
pub(crate) async fn switch_to_present(&self) -> anyhow::Result<()> {
|
||||
let conf = GlobalTimelines::get_global_config();
|
||||
let mut shared = self.write_shared_state().await;
|
||||
|
||||
// trying to restore WAL storage
|
||||
@@ -1074,7 +1069,7 @@ impl ManagerTimeline {
|
||||
&self.ttid,
|
||||
&self.timeline_dir,
|
||||
shared.sk.state(),
|
||||
self.conf.no_sync,
|
||||
conf.no_sync,
|
||||
)?;
|
||||
|
||||
// updating control file
|
||||
@@ -1101,7 +1096,7 @@ impl ManagerTimeline {
|
||||
// now we can switch shared.sk to Present, shouldn't fail
|
||||
let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
|
||||
let cfile_state = prev_sk.take_state();
|
||||
shared.sk = StateSK::Loaded(SafeKeeper::new(cfile_state, wal_store, self.conf.my_id)?);
|
||||
shared.sk = StateSK::Loaded(SafeKeeper::new(cfile_state, wal_store, conf.my_id)?);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ use crate::{control_file, wal_storage, SafeKeeperConf};
|
||||
use anyhow::{bail, Context, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
use camino_tempfile::Utf8TempDir;
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
@@ -41,16 +42,23 @@ struct GlobalTimelinesState {
|
||||
// this map is dropped on restart.
|
||||
tombstones: HashMap<TenantTimelineId, Instant>,
|
||||
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
conf: Option<SafeKeeperConf>,
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
global_rate_limiter: RateLimiter,
|
||||
}
|
||||
|
||||
impl GlobalTimelinesState {
|
||||
/// Get configuration, which must be set once during init.
|
||||
fn get_conf(&self) -> &SafeKeeperConf {
|
||||
self.conf
|
||||
.as_ref()
|
||||
.expect("GlobalTimelinesState conf is not initialized")
|
||||
}
|
||||
|
||||
/// Get dependencies for a timeline constructor.
|
||||
fn get_dependencies(&self) -> (Arc<SafeKeeperConf>, Arc<TimelinesSet>, RateLimiter) {
|
||||
fn get_dependencies(&self) -> (SafeKeeperConf, Arc<TimelinesSet>, RateLimiter) {
|
||||
(
|
||||
self.conf.clone(),
|
||||
self.get_conf().clone(),
|
||||
self.broker_active_set.clone(),
|
||||
self.global_rate_limiter.clone(),
|
||||
)
|
||||
@@ -74,39 +82,35 @@ impl GlobalTimelinesState {
|
||||
}
|
||||
}
|
||||
|
||||
/// A struct used to manage access to the global timelines map.
|
||||
pub struct GlobalTimelines {
|
||||
state: Mutex<GlobalTimelinesState>,
|
||||
}
|
||||
static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
|
||||
Mutex::new(GlobalTimelinesState {
|
||||
timelines: HashMap::new(),
|
||||
tombstones: HashMap::new(),
|
||||
conf: None,
|
||||
broker_active_set: Arc::new(TimelinesSet::default()),
|
||||
global_rate_limiter: RateLimiter::new(1, 1),
|
||||
})
|
||||
});
|
||||
|
||||
/// A zero-sized struct used to manage access to the global timelines map.
|
||||
pub struct GlobalTimelines;
|
||||
|
||||
impl GlobalTimelines {
|
||||
/// Create a new instance of the global timelines map.
|
||||
pub fn new(conf: Arc<SafeKeeperConf>) -> Self {
|
||||
Self {
|
||||
state: Mutex::new(GlobalTimelinesState {
|
||||
timelines: HashMap::new(),
|
||||
tombstones: HashMap::new(),
|
||||
conf,
|
||||
broker_active_set: Arc::new(TimelinesSet::default()),
|
||||
global_rate_limiter: RateLimiter::new(1, 1),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Inject dependencies needed for the timeline constructors and load all timelines to memory.
|
||||
pub async fn init(&self) -> Result<()> {
|
||||
pub async fn init(conf: SafeKeeperConf) -> Result<()> {
|
||||
// clippy isn't smart enough to understand that drop(state) releases the
|
||||
// lock, so use explicit block
|
||||
let tenants_dir = {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
state.global_rate_limiter = RateLimiter::new(
|
||||
state.conf.partial_backup_concurrency,
|
||||
conf.partial_backup_concurrency,
|
||||
DEFAULT_EVICTION_CONCURRENCY,
|
||||
);
|
||||
state.conf = Some(conf);
|
||||
|
||||
// Iterate through all directories and load tenants for all directories
|
||||
// named as a valid tenant_id.
|
||||
state.conf.workdir.clone()
|
||||
state.get_conf().workdir.clone()
|
||||
};
|
||||
let mut tenant_count = 0;
|
||||
for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
|
||||
@@ -118,7 +122,7 @@ impl GlobalTimelines {
|
||||
TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
|
||||
{
|
||||
tenant_count += 1;
|
||||
self.load_tenant_timelines(tenant_id).await?;
|
||||
GlobalTimelines::load_tenant_timelines(tenant_id).await?;
|
||||
}
|
||||
}
|
||||
Err(e) => error!(
|
||||
@@ -131,7 +135,7 @@ impl GlobalTimelines {
|
||||
info!(
|
||||
"found {} tenants directories, successfully loaded {} timelines",
|
||||
tenant_count,
|
||||
self.state.lock().unwrap().timelines.len()
|
||||
TIMELINES_STATE.lock().unwrap().timelines.len()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
@@ -139,13 +143,13 @@ impl GlobalTimelines {
|
||||
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir
|
||||
/// errors if any.
|
||||
///
|
||||
/// It is async, but self.state lock is sync and there is no important
|
||||
/// It is async, but TIMELINES_STATE lock is sync and there is no important
|
||||
/// reason to make it async (it is always held for a short while), so we
|
||||
/// just lock and unlock it for each timeline -- this function is called
|
||||
/// during init when nothing else is running, so this is fine.
|
||||
async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> {
|
||||
async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
|
||||
let (conf, broker_active_set, partial_backup_rate_limiter) = {
|
||||
let state = self.state.lock().unwrap();
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
state.get_dependencies()
|
||||
};
|
||||
|
||||
@@ -159,10 +163,10 @@ impl GlobalTimelines {
|
||||
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
|
||||
{
|
||||
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
|
||||
match Timeline::load_timeline(conf.clone(), ttid) {
|
||||
match Timeline::load_timeline(&conf, ttid) {
|
||||
Ok(tli) => {
|
||||
let mut shared_state = tli.write_shared_state().await;
|
||||
self.state
|
||||
TIMELINES_STATE
|
||||
.lock()
|
||||
.unwrap()
|
||||
.timelines
|
||||
@@ -196,30 +200,29 @@ impl GlobalTimelines {
|
||||
}
|
||||
|
||||
/// Get the number of timelines in the map.
|
||||
pub fn timelines_count(&self) -> usize {
|
||||
self.state.lock().unwrap().timelines.len()
|
||||
pub fn timelines_count() -> usize {
|
||||
TIMELINES_STATE.lock().unwrap().timelines.len()
|
||||
}
|
||||
|
||||
/// Get the global safekeeper config.
|
||||
pub fn get_global_config(&self) -> Arc<SafeKeeperConf> {
|
||||
self.state.lock().unwrap().conf.clone()
|
||||
pub fn get_global_config() -> SafeKeeperConf {
|
||||
TIMELINES_STATE.lock().unwrap().get_conf().clone()
|
||||
}
|
||||
|
||||
pub fn get_global_broker_active_set(&self) -> Arc<TimelinesSet> {
|
||||
self.state.lock().unwrap().broker_active_set.clone()
|
||||
pub fn get_global_broker_active_set() -> Arc<TimelinesSet> {
|
||||
TIMELINES_STATE.lock().unwrap().broker_active_set.clone()
|
||||
}
|
||||
|
||||
/// Create a new timeline with the given id. If the timeline already exists, returns
|
||||
/// an existing timeline.
|
||||
pub(crate) async fn create(
|
||||
&self,
|
||||
ttid: TenantTimelineId,
|
||||
server_info: ServerInfo,
|
||||
commit_lsn: Lsn,
|
||||
local_start_lsn: Lsn,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let (conf, _, _) = {
|
||||
let state = self.state.lock().unwrap();
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
if let Ok(timeline) = state.get(&ttid) {
|
||||
// Timeline already exists, return it.
|
||||
return Ok(timeline);
|
||||
@@ -242,7 +245,7 @@ impl GlobalTimelines {
|
||||
let state =
|
||||
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?;
|
||||
control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
|
||||
let timeline = self.load_temp_timeline(ttid, &tmp_dir_path, true).await?;
|
||||
let timeline = GlobalTimelines::load_temp_timeline(ttid, &tmp_dir_path, true).await?;
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
@@ -258,14 +261,13 @@ impl GlobalTimelines {
|
||||
/// 2) move the directory and load the timeline
|
||||
/// 3) take lock again and insert the timeline into the global map.
|
||||
pub async fn load_temp_timeline(
|
||||
&self,
|
||||
ttid: TenantTimelineId,
|
||||
tmp_path: &Utf8PathBuf,
|
||||
check_tombstone: bool,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
// Check for existence and mark that we're creating it.
|
||||
let (conf, broker_active_set, partial_backup_rate_limiter) = {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
match state.timelines.get(&ttid) {
|
||||
Some(GlobalMapTimeline::CreationInProgress) => {
|
||||
bail!(TimelineError::CreationInProgress(ttid));
|
||||
@@ -293,10 +295,10 @@ impl GlobalTimelines {
|
||||
};
|
||||
|
||||
// Do the actual move and reflect the result in the map.
|
||||
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, conf.clone()).await {
|
||||
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, &conf).await {
|
||||
Ok(timeline) => {
|
||||
let mut timeline_shared_state = timeline.write_shared_state().await;
|
||||
let mut state = self.state.lock().unwrap();
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
assert!(matches!(
|
||||
state.timelines.get(&ttid),
|
||||
Some(GlobalMapTimeline::CreationInProgress)
|
||||
@@ -317,7 +319,7 @@ impl GlobalTimelines {
|
||||
}
|
||||
Err(e) => {
|
||||
// Init failed, remove the marker from the map
|
||||
let mut state = self.state.lock().unwrap();
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
assert!(matches!(
|
||||
state.timelines.get(&ttid),
|
||||
Some(GlobalMapTimeline::CreationInProgress)
|
||||
@@ -332,10 +334,10 @@ impl GlobalTimelines {
|
||||
async fn install_temp_timeline(
|
||||
ttid: TenantTimelineId,
|
||||
tmp_path: &Utf8PathBuf,
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
conf: &SafeKeeperConf,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let tenant_path = get_tenant_dir(conf.as_ref(), &ttid.tenant_id);
|
||||
let timeline_path = get_timeline_dir(conf.as_ref(), &ttid);
|
||||
let tenant_path = get_tenant_dir(conf, &ttid.tenant_id);
|
||||
let timeline_path = get_timeline_dir(conf, &ttid);
|
||||
|
||||
// We must have already checked that timeline doesn't exist in the map,
|
||||
// but there might be existing datadir: if timeline is corrupted it is
|
||||
@@ -380,9 +382,9 @@ impl GlobalTimelines {
|
||||
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
|
||||
/// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
|
||||
/// i.e. loaded in memory and not cancelled.
|
||||
pub(crate) fn get(&self, ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
|
||||
pub(crate) fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
|
||||
let tli_res = {
|
||||
let state = self.state.lock().unwrap();
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
state.get(&ttid)
|
||||
};
|
||||
match tli_res {
|
||||
@@ -397,8 +399,8 @@ impl GlobalTimelines {
|
||||
}
|
||||
|
||||
/// Returns all timelines. This is used for background timeline processes.
|
||||
pub fn get_all(&self) -> Vec<Arc<Timeline>> {
|
||||
let global_lock = self.state.lock().unwrap();
|
||||
pub fn get_all() -> Vec<Arc<Timeline>> {
|
||||
let global_lock = TIMELINES_STATE.lock().unwrap();
|
||||
global_lock
|
||||
.timelines
|
||||
.values()
|
||||
@@ -417,8 +419,8 @@ impl GlobalTimelines {
|
||||
|
||||
/// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
|
||||
/// and that's why it can return cancelled timelines, to retry deleting them.
|
||||
fn get_all_for_tenant(&self, tenant_id: TenantId) -> Vec<Arc<Timeline>> {
|
||||
let global_lock = self.state.lock().unwrap();
|
||||
fn get_all_for_tenant(tenant_id: TenantId) -> Vec<Arc<Timeline>> {
|
||||
let global_lock = TIMELINES_STATE.lock().unwrap();
|
||||
global_lock
|
||||
.timelines
|
||||
.values()
|
||||
@@ -433,12 +435,11 @@ impl GlobalTimelines {
|
||||
/// Cancels timeline, then deletes the corresponding data directory.
|
||||
/// If only_local, doesn't remove WAL segments in remote storage.
|
||||
pub(crate) async fn delete(
|
||||
&self,
|
||||
ttid: &TenantTimelineId,
|
||||
only_local: bool,
|
||||
) -> Result<TimelineDeleteForceResult> {
|
||||
let tli_res = {
|
||||
let state = self.state.lock().unwrap();
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
|
||||
if state.tombstones.contains_key(ttid) {
|
||||
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
|
||||
@@ -471,7 +472,7 @@ impl GlobalTimelines {
|
||||
}
|
||||
Err(_) => {
|
||||
// Timeline is not memory, but it may still exist on disk in broken state.
|
||||
let dir_path = get_timeline_dir(self.state.lock().unwrap().conf.as_ref(), ttid);
|
||||
let dir_path = get_timeline_dir(TIMELINES_STATE.lock().unwrap().get_conf(), ttid);
|
||||
let dir_existed = delete_dir(dir_path)?;
|
||||
|
||||
Ok(TimelineDeleteForceResult {
|
||||
@@ -484,7 +485,7 @@ impl GlobalTimelines {
|
||||
// Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones
|
||||
// are used to prevent still-running computes from re-creating the same timeline when they send data,
|
||||
// and to speed up repeated deletion calls by avoiding re-listing objects.
|
||||
self.state.lock().unwrap().delete(*ttid);
|
||||
TIMELINES_STATE.lock().unwrap().delete(*ttid);
|
||||
|
||||
result
|
||||
}
|
||||
@@ -496,18 +497,17 @@ impl GlobalTimelines {
|
||||
///
|
||||
/// If only_local, doesn't remove WAL segments in remote storage.
|
||||
pub async fn delete_force_all_for_tenant(
|
||||
&self,
|
||||
tenant_id: &TenantId,
|
||||
only_local: bool,
|
||||
) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
|
||||
info!("deleting all timelines for tenant {}", tenant_id);
|
||||
let to_delete = self.get_all_for_tenant(*tenant_id);
|
||||
let to_delete = Self::get_all_for_tenant(*tenant_id);
|
||||
|
||||
let mut err = None;
|
||||
|
||||
let mut deleted = HashMap::new();
|
||||
for tli in &to_delete {
|
||||
match self.delete(&tli.ttid, only_local).await {
|
||||
match Self::delete(&tli.ttid, only_local).await {
|
||||
Ok(result) => {
|
||||
deleted.insert(tli.ttid, result);
|
||||
}
|
||||
@@ -529,15 +529,15 @@ impl GlobalTimelines {
|
||||
// so the directory may be not empty. In this case timelines will have bad state
|
||||
// and timeline background jobs can panic.
|
||||
delete_dir(get_tenant_dir(
|
||||
self.state.lock().unwrap().conf.as_ref(),
|
||||
TIMELINES_STATE.lock().unwrap().get_conf(),
|
||||
tenant_id,
|
||||
))?;
|
||||
|
||||
Ok(deleted)
|
||||
}
|
||||
|
||||
pub fn housekeeping(&self, tombstone_ttl: &Duration) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
pub fn housekeeping(tombstone_ttl: &Duration) {
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
|
||||
// We keep tombstones long enough to have a good chance of preventing rogue computes from re-creating deleted
|
||||
// timelines. If a compute kept running for longer than this TTL (or across a safekeeper restart) then they
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
//!
|
||||
use anyhow::{Context, Result};
|
||||
use postgres_backend::QueryError;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_io_timeout::TimeoutReader;
|
||||
@@ -12,9 +11,9 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::{auth::Scope, measured_stream::MeasuredStream};
|
||||
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::metrics::TrafficMetrics;
|
||||
use crate::SafeKeeperConf;
|
||||
use crate::{handler::SafekeeperPostgresHandler, GlobalTimelines};
|
||||
use postgres_backend::{AuthType, PostgresBackend};
|
||||
|
||||
/// Accept incoming TCP connections and spawn them into a background thread.
|
||||
@@ -23,10 +22,9 @@ use postgres_backend::{AuthType, PostgresBackend};
|
||||
/// to any tenant are allowed) or Tenant (only tokens giving access to specific
|
||||
/// tenant are allowed). Doesn't matter if auth is disabled in conf.
|
||||
pub async fn task_main(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
conf: SafeKeeperConf,
|
||||
pg_listener: std::net::TcpListener,
|
||||
allowed_auth_scope: Scope,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Tokio's from_std won't do this for us, per its comment.
|
||||
pg_listener.set_nonblocking(true)?;
|
||||
@@ -39,10 +37,10 @@ pub async fn task_main(
|
||||
debug!("accepted connection from {}", peer_addr);
|
||||
let conf = conf.clone();
|
||||
let conn_id = issue_connection_id(&mut connection_count);
|
||||
let global_timelines = global_timelines.clone();
|
||||
|
||||
tokio::spawn(
|
||||
async move {
|
||||
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope, global_timelines).await {
|
||||
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope).await {
|
||||
error!("connection handler exited: {}", err);
|
||||
}
|
||||
}
|
||||
@@ -55,10 +53,9 @@ pub async fn task_main(
|
||||
///
|
||||
async fn handle_socket(
|
||||
socket: TcpStream,
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
conf: SafeKeeperConf,
|
||||
conn_id: ConnectionId,
|
||||
allowed_auth_scope: Scope,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> Result<(), QueryError> {
|
||||
socket.set_nodelay(true)?;
|
||||
let peer_addr = socket.peer_addr()?;
|
||||
@@ -99,13 +96,8 @@ async fn handle_socket(
|
||||
Some(_) => AuthType::NeonJWT,
|
||||
};
|
||||
let auth_pair = auth_key.map(|key| (allowed_auth_scope, key));
|
||||
let mut conn_handler = SafekeeperPostgresHandler::new(
|
||||
conf,
|
||||
conn_id,
|
||||
Some(traffic_metrics.clone()),
|
||||
auth_pair,
|
||||
global_timelines,
|
||||
);
|
||||
let mut conn_handler =
|
||||
SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()), auth_pair);
|
||||
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
|
||||
// libpq protocol between safekeeper and walproposer / pageserver
|
||||
// We don't use shutdown.
|
||||
|
||||
@@ -636,13 +636,6 @@ impl Persistence {
|
||||
.into_boxed(),
|
||||
};
|
||||
|
||||
// Clear generation_pageserver if we are moving into a state where we won't have
|
||||
// any attached pageservers.
|
||||
let input_generation_pageserver = match input_placement_policy {
|
||||
None | Some(PlacementPolicy::Attached(_)) => None,
|
||||
Some(PlacementPolicy::Detached | PlacementPolicy::Secondary) => Some(None),
|
||||
};
|
||||
|
||||
#[derive(AsChangeset)]
|
||||
#[diesel(table_name = crate::schema::tenant_shards)]
|
||||
struct ShardUpdate {
|
||||
@@ -650,7 +643,6 @@ impl Persistence {
|
||||
placement_policy: Option<String>,
|
||||
config: Option<String>,
|
||||
scheduling_policy: Option<String>,
|
||||
generation_pageserver: Option<Option<i64>>,
|
||||
}
|
||||
|
||||
let update = ShardUpdate {
|
||||
@@ -663,7 +655,6 @@ impl Persistence {
|
||||
.map(|c| serde_json::to_string(&c).unwrap()),
|
||||
scheduling_policy: input_scheduling_policy
|
||||
.map(|p| serde_json::to_string(&p).unwrap()),
|
||||
generation_pageserver: input_generation_pageserver,
|
||||
};
|
||||
|
||||
query.set(update).execute(conn)?;
|
||||
|
||||
@@ -513,9 +513,6 @@ struct ShardUpdate {
|
||||
|
||||
/// If this is None, generation is not updated.
|
||||
generation: Option<Generation>,
|
||||
|
||||
/// If this is None, scheduling policy is not updated.
|
||||
scheduling_policy: Option<ShardSchedulingPolicy>,
|
||||
}
|
||||
|
||||
enum StopReconciliationsReason {
|
||||
@@ -792,7 +789,7 @@ impl Service {
|
||||
node_list_futs.push({
|
||||
async move {
|
||||
tracing::info!("Scanning shards on node {node}...");
|
||||
let timeout = Duration::from_secs(5);
|
||||
let timeout = Duration::from_secs(1);
|
||||
let response = node
|
||||
.with_client_retries(
|
||||
|client| async move { client.list_location_config().await },
|
||||
@@ -2379,23 +2376,6 @@ impl Service {
|
||||
}
|
||||
};
|
||||
|
||||
// Ordinarily we do not update scheduling policy, but when making major changes
|
||||
// like detaching or demoting to secondary-only, we need to force the scheduling
|
||||
// mode to Active, or the caller's expected outcome (detach it) will not happen.
|
||||
let scheduling_policy = match req.config.mode {
|
||||
LocationConfigMode::Detached | LocationConfigMode::Secondary => {
|
||||
// Special case: when making major changes like detaching or demoting to secondary-only,
|
||||
// we need to force the scheduling mode to Active, or nothing will happen.
|
||||
Some(ShardSchedulingPolicy::Active)
|
||||
}
|
||||
LocationConfigMode::AttachedMulti
|
||||
| LocationConfigMode::AttachedSingle
|
||||
| LocationConfigMode::AttachedStale => {
|
||||
// While attached, continue to respect whatever the existing scheduling mode is.
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let mut create = true;
|
||||
for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
|
||||
// Saw an existing shard: this is not a creation
|
||||
@@ -2421,7 +2401,6 @@ impl Service {
|
||||
placement_policy: placement_policy.clone(),
|
||||
tenant_config: req.config.tenant_conf.clone(),
|
||||
generation: set_generation,
|
||||
scheduling_policy,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2518,7 +2497,6 @@ impl Service {
|
||||
placement_policy,
|
||||
tenant_config,
|
||||
generation,
|
||||
scheduling_policy,
|
||||
} in &updates
|
||||
{
|
||||
self.persistence
|
||||
@@ -2527,7 +2505,7 @@ impl Service {
|
||||
Some(placement_policy.clone()),
|
||||
Some(tenant_config.clone()),
|
||||
*generation,
|
||||
*scheduling_policy,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@@ -2543,7 +2521,6 @@ impl Service {
|
||||
placement_policy,
|
||||
tenant_config,
|
||||
generation: update_generation,
|
||||
scheduling_policy,
|
||||
} in updates
|
||||
{
|
||||
let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
|
||||
@@ -2562,10 +2539,6 @@ impl Service {
|
||||
shard.generation = Some(generation);
|
||||
}
|
||||
|
||||
if let Some(scheduling_policy) = scheduling_policy {
|
||||
shard.set_scheduling_policy(scheduling_policy);
|
||||
}
|
||||
|
||||
shard.schedule(scheduler, &mut schedule_context)?;
|
||||
|
||||
let maybe_waiter = self.maybe_reconcile_shard(shard, nodes);
|
||||
@@ -3019,17 +2992,9 @@ impl Service {
|
||||
|
||||
let TenantPolicyRequest {
|
||||
placement,
|
||||
mut scheduling,
|
||||
scheduling,
|
||||
} = req;
|
||||
|
||||
if let Some(PlacementPolicy::Detached | PlacementPolicy::Secondary) = placement {
|
||||
// When someone configures a tenant to detach, we force the scheduling policy to enable
|
||||
// this to take effect.
|
||||
if scheduling.is_none() {
|
||||
scheduling = Some(ShardSchedulingPolicy::Active);
|
||||
}
|
||||
}
|
||||
|
||||
self.persistence
|
||||
.update_tenant_shard(
|
||||
TenantFilter::Tenant(tenant_id),
|
||||
|
||||
@@ -459,10 +459,12 @@ pub async fn get_timeline_objects(
|
||||
Ok(list.keys)
|
||||
}
|
||||
|
||||
const MAX_KEYS_PER_DELETE: usize = 1000;
|
||||
|
||||
/// Drain a buffer of keys into DeleteObjects requests
|
||||
///
|
||||
/// If `drain` is true, drains keys completely; otherwise stops when <
|
||||
/// `max_keys_per_delete`` keys are left.
|
||||
/// MAX_KEYS_PER_DELETE keys are left.
|
||||
/// `num_deleted` returns number of deleted keys.
|
||||
async fn do_delete(
|
||||
remote_client: &GenericRemoteStorage,
|
||||
@@ -472,10 +474,9 @@ async fn do_delete(
|
||||
progress_tracker: &mut DeletionProgressTracker,
|
||||
) -> anyhow::Result<()> {
|
||||
let cancel = CancellationToken::new();
|
||||
let max_keys_per_delete = remote_client.max_keys_per_delete();
|
||||
while (!keys.is_empty() && drain) || (keys.len() >= max_keys_per_delete) {
|
||||
while (!keys.is_empty() && drain) || (keys.len() >= MAX_KEYS_PER_DELETE) {
|
||||
let request_keys =
|
||||
keys.split_off(keys.len() - (std::cmp::min(max_keys_per_delete, keys.len())));
|
||||
keys.split_off(keys.len() - (std::cmp::min(MAX_KEYS_PER_DELETE, keys.len())));
|
||||
|
||||
let request_keys: Vec<RemotePath> = request_keys.into_iter().map(|o| o.key).collect();
|
||||
|
||||
@@ -616,7 +617,7 @@ pub async fn purge_garbage(
|
||||
}
|
||||
|
||||
objects_to_delete.append(&mut object_list);
|
||||
if objects_to_delete.len() >= remote_client.max_keys_per_delete() {
|
||||
if objects_to_delete.len() >= MAX_KEYS_PER_DELETE {
|
||||
do_delete(
|
||||
&remote_client,
|
||||
&mut objects_to_delete,
|
||||
|
||||
@@ -268,7 +268,7 @@ impl BucketConfig {
|
||||
config.bucket_name, config.bucket_region
|
||||
),
|
||||
RemoteStorageKind::AzureContainer(config) => format!(
|
||||
"container {}, storage account {:?}, region {}",
|
||||
"bucket {}, storage account {:?}, region {}",
|
||||
config.container_name, config.storage_account, config.container_region
|
||||
),
|
||||
}
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
# How to run the `pg_regress` tests on a cloud Neon instance.
|
||||
|
||||
* Create a Neon project on staging.
|
||||
* Grant the superuser privileges to the DB user.
|
||||
* (Optional) create a branch for testing
|
||||
* Configure the endpoint by updating the control-plane database with the following settings:
|
||||
* `Timeone`: `America/Los_Angeles`
|
||||
* `DateStyle`: `Postgres,MDY`
|
||||
* `compute_query_id`: `off`
|
||||
* Checkout the actual `Neon` sources
|
||||
* Patch the sql and expected files for the specific PostgreSQL version, e.g. for v17:
|
||||
```bash
|
||||
$ cd vendor/postgres-v17
|
||||
$ patch -p1 <../../compute/patches/cloud_regress_pg17.patch
|
||||
```
|
||||
* Set the environment variable `BENCHMARK_CONNSTR` to the connection URI of your project.
|
||||
* Set the environment variable `PG_VERSION` to the version of your project.
|
||||
* Run
|
||||
```bash
|
||||
$ pytest -m remote_cluster -k cloud_regress
|
||||
```
|
||||
@@ -5,15 +5,68 @@ Run the regression tests on the cloud instance of Neon
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import RemotePostgres
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def setup(remote_pg: RemotePostgres):
|
||||
"""
|
||||
Setup and teardown of the tests
|
||||
"""
|
||||
with psycopg2.connect(remote_pg.connstr()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
log.info("Creating the extension")
|
||||
cur.execute("CREATE EXTENSION IF NOT EXISTS regress_so")
|
||||
conn.commit()
|
||||
# TODO: Migrate to branches and remove this code
|
||||
log.info("Looking for subscriptions in the regress database")
|
||||
cur.execute(
|
||||
"SELECT subname FROM pg_catalog.pg_subscription WHERE "
|
||||
"subdbid = (SELECT oid FROM pg_catalog.pg_database WHERE datname='regression');"
|
||||
)
|
||||
if cur.rowcount > 0:
|
||||
with psycopg2.connect(
|
||||
dbname="regression",
|
||||
host=remote_pg.default_options["host"],
|
||||
user=remote_pg.default_options["user"],
|
||||
password=remote_pg.default_options["password"],
|
||||
) as regress_conn:
|
||||
with regress_conn.cursor() as regress_cur:
|
||||
for sub in cur:
|
||||
regress_cur.execute(f"ALTER SUBSCRIPTION {sub[0]} DISABLE")
|
||||
regress_cur.execute(
|
||||
f"ALTER SUBSCRIPTION {sub[0]} SET (slot_name = NONE)"
|
||||
)
|
||||
regress_cur.execute(f"DROP SUBSCRIPTION {sub[0]}")
|
||||
regress_conn.commit()
|
||||
|
||||
yield
|
||||
# TODO: Migrate to branches and remove this code
|
||||
log.info("Looking for extra roles...")
|
||||
with psycopg2.connect(remote_pg.connstr()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT rolname FROM pg_catalog.pg_roles WHERE oid > 16384 AND rolname <> 'neondb_owner'"
|
||||
)
|
||||
roles: list[Any] = []
|
||||
for role in cur:
|
||||
log.info("Role found: %s", role[0])
|
||||
roles.append(role[0])
|
||||
for role in roles:
|
||||
cur.execute(f"DROP ROLE {role}")
|
||||
conn.commit()
|
||||
|
||||
|
||||
@pytest.mark.timeout(7200)
|
||||
@pytest.mark.remote_cluster
|
||||
def test_cloud_regress(
|
||||
setup,
|
||||
remote_pg: RemotePostgres,
|
||||
pg_version: PgVersion,
|
||||
pg_distrib_dir: Path,
|
||||
|
||||
@@ -152,8 +152,6 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
|
||||
"pageserver_resident_physical_size",
|
||||
"pageserver_io_operations_bytes_total",
|
||||
"pageserver_last_record_lsn",
|
||||
"pageserver_disk_consistent_lsn",
|
||||
"pageserver_projected_remote_consistent_lsn",
|
||||
"pageserver_standby_horizon",
|
||||
"pageserver_smgr_query_seconds_bucket",
|
||||
"pageserver_smgr_query_seconds_count",
|
||||
@@ -175,8 +173,6 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
|
||||
counter("pageserver_tenant_throttling_count_accounted_finish"),
|
||||
counter("pageserver_tenant_throttling_wait_usecs_sum"),
|
||||
counter("pageserver_tenant_throttling_count"),
|
||||
counter("pageserver_timeline_wal_records_received"),
|
||||
counter("pageserver_page_service_pagestream_flush_in_progress_micros"),
|
||||
*histogram("pageserver_page_service_batch_size"),
|
||||
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
|
||||
# "pageserver_directory_entries_count", -- only used if above a certain threshold
|
||||
|
||||
@@ -850,7 +850,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
force_repartition=False,
|
||||
force_image_layer_creation=False,
|
||||
force_l0_compaction=False,
|
||||
wait_until_flushed=True,
|
||||
wait_until_uploaded=False,
|
||||
compact: bool | None = None,
|
||||
**kwargs,
|
||||
@@ -863,8 +862,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
query["force_image_layer_creation"] = "true"
|
||||
if force_l0_compaction:
|
||||
query["force_l0_compaction"] = "true"
|
||||
if not wait_until_flushed:
|
||||
query["wait_until_flushed"] = "false"
|
||||
if wait_until_uploaded:
|
||||
query["wait_until_uploaded"] = "true"
|
||||
|
||||
@@ -872,7 +869,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
query["compact"] = "true" if compact else "false"
|
||||
|
||||
log.info(
|
||||
f"Requesting checkpoint: tenant={tenant_id} timeline={timeline_id} wait_until_flushed={wait_until_flushed} wait_until_uploaded={wait_until_uploaded} compact={compact}"
|
||||
f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}, wait_until_uploaded={wait_until_uploaded}"
|
||||
)
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",
|
||||
|
||||
@@ -54,15 +54,23 @@ def wait_for_upload(
|
||||
tenant: TenantId | TenantShardId,
|
||||
timeline: TimelineId,
|
||||
lsn: Lsn,
|
||||
timeout=20,
|
||||
):
|
||||
"""Waits for local timeline upload up to specified LSN"""
|
||||
"""waits for local timeline upload up to specified lsn"""
|
||||
|
||||
def is_uploaded():
|
||||
remote_lsn = remote_consistent_lsn(pageserver_http, tenant, timeline)
|
||||
assert remote_lsn >= lsn, f"remote_consistent_lsn at {remote_lsn}"
|
||||
|
||||
wait_until(is_uploaded, name=f"upload to {lsn}", timeout=timeout)
|
||||
current_lsn = Lsn(0)
|
||||
for i in range(20):
|
||||
current_lsn = remote_consistent_lsn(pageserver_http, tenant, timeline)
|
||||
if current_lsn >= lsn:
|
||||
log.info("wait finished")
|
||||
return
|
||||
lr_lsn = last_record_lsn(pageserver_http, tenant, timeline)
|
||||
log.info(
|
||||
f"waiting for remote_consistent_lsn to reach {lsn}, now {current_lsn}, last_record_lsn={lr_lsn}, iteration {i + 1}"
|
||||
)
|
||||
time.sleep(1)
|
||||
raise Exception(
|
||||
f"timed out while waiting for {tenant}/{timeline} remote_consistent_lsn to reach {lsn}, was {current_lsn}"
|
||||
)
|
||||
|
||||
|
||||
def _tenant_in_expected_state(tenant_info: dict[str, Any], expected_state: str):
|
||||
|
||||
201
test_runner/fixtures/pageserver_mitm.py
Normal file
201
test_runner/fixtures/pageserver_mitm.py
Normal file
@@ -0,0 +1,201 @@
|
||||
# Intercept compute -> pageserver connections, to simulate various failure modes
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import struct
|
||||
import threading
|
||||
import traceback
|
||||
from asyncio import TaskGroup
|
||||
from enum import Enum
|
||||
|
||||
from fixtures.log_helper import log
|
||||
|
||||
|
||||
class ConnectionState(Enum):
|
||||
HANDSHAKE = (1,)
|
||||
AUTHOK = (2,)
|
||||
COPYBOTH = (3,)
|
||||
|
||||
|
||||
class BreakConnectionException(Exception):
|
||||
def __init__(self, message):
|
||||
super().__init__(message)
|
||||
self.message = message
|
||||
|
||||
|
||||
class ProxyShutdownException(Exception):
|
||||
"""Exception raised to shut down connection when the proxy is shut down."""
|
||||
|
||||
|
||||
# The handshake flow:
|
||||
#
|
||||
# 1. compute establishes TCP connection
|
||||
# 2. libpq handshake and auth
|
||||
# 3. Enter CopyBoth mode
|
||||
#
|
||||
# From then on, CopyData messages are exchanged in both directions
|
||||
class Connection:
|
||||
def __init__(
|
||||
self,
|
||||
conn_id,
|
||||
compute_reader,
|
||||
compute_writer,
|
||||
shutdown_event,
|
||||
dest_port: int,
|
||||
response_cb=None,
|
||||
):
|
||||
self.conn_id = conn_id
|
||||
self.compute_reader = compute_reader
|
||||
self.compute_writer = compute_writer
|
||||
self.shutdown_event = shutdown_event
|
||||
self.response_cb = response_cb
|
||||
self.dest_port = dest_port
|
||||
|
||||
self.state = ConnectionState.HANDSHAKE
|
||||
|
||||
async def run(self):
|
||||
async def wait_for_shutdown():
|
||||
await self.shutdown_event.wait()
|
||||
raise ProxyShutdownException
|
||||
|
||||
try:
|
||||
addr = self.compute_writer.get_extra_info("peername")
|
||||
log.info(f"[{self.conn_id}] connection received from {addr}")
|
||||
|
||||
async with TaskGroup() as group:
|
||||
group.create_task(wait_for_shutdown())
|
||||
|
||||
self.ps_reader, self.ps_writer = await asyncio.open_connection(
|
||||
"localhost", self.dest_port
|
||||
)
|
||||
|
||||
group.create_task(self.handle_compute_to_pageserver())
|
||||
group.create_task(self.handle_pageserver_to_compute())
|
||||
|
||||
except* ProxyShutdownException:
|
||||
log.info(f"[{self.conn_id}] proxy shutting down")
|
||||
|
||||
except* asyncio.exceptions.IncompleteReadError as e:
|
||||
log.info(f"[{self.conn_id}] EOF reached: {e}")
|
||||
|
||||
except* BreakConnectionException as eg:
|
||||
for e in eg.exceptions:
|
||||
log.info(f"[{self.conn_id}] callback breaks connection: {e}")
|
||||
|
||||
except* Exception as e:
|
||||
s = "\n".join(traceback.format_exception(e))
|
||||
log.info(f"[{self.conn_id}] {s}")
|
||||
|
||||
self.compute_writer.close()
|
||||
self.ps_writer.close()
|
||||
await self.compute_writer.wait_closed()
|
||||
await self.ps_writer.wait_closed()
|
||||
|
||||
async def handle_compute_to_pageserver(self):
|
||||
while self.state == ConnectionState.HANDSHAKE:
|
||||
rawmsg = await self.compute_reader.read(1000)
|
||||
log.debug(f"[{self.conn_id}] C -> PS: handshake msg len {len(rawmsg)}")
|
||||
self.ps_writer.write(rawmsg)
|
||||
await self.ps_writer.drain()
|
||||
|
||||
while True:
|
||||
msgtype = await self.compute_reader.readexactly(1)
|
||||
msglen_bytes = await self.compute_reader.readexactly(4)
|
||||
(msglen,) = struct.unpack("!L", msglen_bytes)
|
||||
payload = await self.compute_reader.readexactly(msglen - 4)
|
||||
|
||||
# request_callback()
|
||||
# CopyData
|
||||
if msgtype == b"d":
|
||||
# TODO: call callback
|
||||
log.debug(f"[{self.conn_id}] C -> PS: CopyData ({msglen} bytes)")
|
||||
pass
|
||||
else:
|
||||
log.debug(f"[{self.conn_id}] C -> PS: message of type '{msgtype}' ({msglen} bytes)")
|
||||
|
||||
self.ps_writer.write(msgtype)
|
||||
self.ps_writer.write(msglen_bytes)
|
||||
self.ps_writer.write(payload)
|
||||
await self.ps_writer.drain()
|
||||
|
||||
async def handle_pageserver_to_compute(self):
|
||||
while True:
|
||||
msgtype = await self.ps_reader.readexactly(1)
|
||||
|
||||
# response to SSLRequest
|
||||
if msgtype == b"N" and self.state == ConnectionState.HANDSHAKE:
|
||||
log.debug(f"[{self.conn_id}] PS -> C: N")
|
||||
self.compute_writer.write(msgtype)
|
||||
await self.compute_writer.drain()
|
||||
continue
|
||||
|
||||
msglen_bytes = await self.ps_reader.readexactly(4)
|
||||
(msglen,) = struct.unpack("!L", msglen_bytes)
|
||||
payload = await self.ps_reader.readexactly(msglen - 4)
|
||||
|
||||
# AuthenticationOK
|
||||
if msgtype == b"R":
|
||||
self.state = ConnectionState.AUTHOK
|
||||
log.debug(f"[{self.conn_id}] PS -> C: AuthenticationOK ({msglen} bytes)")
|
||||
|
||||
# CopyBothresponse
|
||||
elif msgtype == b"W":
|
||||
self.state = ConnectionState.COPYBOTH
|
||||
log.debug(f"[{self.conn_id}] PS -> C: CopyBothResponse ({msglen} bytes)")
|
||||
|
||||
# CopyData
|
||||
elif msgtype == b"d":
|
||||
log.debug(f"[{self.conn_id}] PS -> C: CopyData ({msglen} bytes)")
|
||||
if self.response_cb is not None:
|
||||
await self.response_cb(self.conn_id)
|
||||
pass
|
||||
|
||||
else:
|
||||
log.debug(f"[{self.conn_id}] PS -> C: message of type '{msgtype}' ({msglen} bytes)")
|
||||
|
||||
self.compute_writer.write(msgtype)
|
||||
self.compute_writer.write(msglen_bytes)
|
||||
self.compute_writer.write(payload)
|
||||
await self.compute_writer.drain()
|
||||
|
||||
|
||||
class PageserverProxy:
|
||||
def __init__(self, listen_port: int, dest_port: int, response_cb=None):
|
||||
self.listen_port = listen_port
|
||||
self.dest_port = dest_port
|
||||
self.response_cb = response_cb
|
||||
self.conn_counter = 0
|
||||
self.shutdown_event = asyncio.Event()
|
||||
|
||||
def shutdown(self):
|
||||
self.serve_task.cancel()
|
||||
self.shutdown_event.set()
|
||||
|
||||
async def handle_client(self, compute_reader, compute_writer):
|
||||
self.conn_counter += 1
|
||||
conn_id = self.conn_counter
|
||||
conn = Connection(
|
||||
conn_id,
|
||||
compute_reader,
|
||||
compute_writer,
|
||||
self.shutdown_event,
|
||||
self.dest_port,
|
||||
self.response_cb,
|
||||
)
|
||||
await conn.run()
|
||||
|
||||
async def run_server(self):
|
||||
log.info("run_server called")
|
||||
server = await asyncio.start_server(self.handle_client, "localhost", self.listen_port)
|
||||
|
||||
self.serve_task = asyncio.create_task(server.serve_forever())
|
||||
|
||||
try:
|
||||
await self.serve_task
|
||||
except asyncio.CancelledError:
|
||||
log.info("proxy shutting down")
|
||||
|
||||
def launch_server_in_thread(self):
|
||||
t1 = threading.Thread(target=asyncio.run, args=self.run_server)
|
||||
t1.start()
|
||||
@@ -1,142 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import random
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.common_types import Lsn
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.utils import (
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
wait_for_upload_queue_empty,
|
||||
)
|
||||
from fixtures.remote_storage import s3_storage
|
||||
|
||||
|
||||
@pytest.mark.timeout(900)
|
||||
@pytest.mark.parametrize("size", [8, 1024, 8192])
|
||||
@pytest.mark.parametrize("s3", [True, False], ids=["s3", "local"])
|
||||
@pytest.mark.parametrize("backpressure", [True, False], ids=["backpressure", "nobackpressure"])
|
||||
@pytest.mark.parametrize("fsync", [True, False], ids=["fsync", "nofsync"])
|
||||
def test_ingest_insert_bulk(
|
||||
request: pytest.FixtureRequest,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
fsync: bool,
|
||||
backpressure: bool,
|
||||
s3: bool,
|
||||
size: int,
|
||||
):
|
||||
"""
|
||||
Benchmarks ingestion of 5 GB of sequential insert WAL. Measures ingestion and S3 upload
|
||||
separately. Also does a Safekeeper→Pageserver re-ingestion to measure Pageserver ingestion in
|
||||
isolation.
|
||||
"""
|
||||
|
||||
CONCURRENCY = 1 # 1 is optimal without fsync or backpressure
|
||||
VOLUME = 5 * 1024**3
|
||||
rows = VOLUME // (size + 64) # +64 roughly accounts for per-row WAL overhead
|
||||
|
||||
neon_env_builder.safekeepers_enable_fsync = fsync
|
||||
|
||||
if s3:
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
# NB: don't use S3 for Safekeeper. It doesn't affect throughput (no backpressure), but it
|
||||
# would compete with Pageserver for bandwidth.
|
||||
# neon_env_builder.enable_safekeeper_remote_storage(s3_storage())
|
||||
|
||||
neon_env_builder.disable_scrub_on_exit() # immediate shutdown may leave stray layers
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
f"fsync = {fsync}",
|
||||
"max_replication_apply_lag = 0",
|
||||
f"max_replication_flush_lag = {'10GB' if backpressure else '0'}",
|
||||
# NB: neon_local defaults to 15MB, which is too slow -- production uses 500MB.
|
||||
f"max_replication_write_lag = {'500MB' if backpressure else '0'}",
|
||||
],
|
||||
)
|
||||
endpoint.safe_psql("create extension neon")
|
||||
|
||||
# Wait for the timeline to be propagated to the pageserver.
|
||||
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
# Ingest rows.
|
||||
log.info("Ingesting data")
|
||||
start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
|
||||
def insert_rows(endpoint, table, count, value):
|
||||
with endpoint.connect().cursor() as cur:
|
||||
cur.execute("set statement_timeout = 0")
|
||||
cur.execute(f"create table {table} (id int, data bytea)")
|
||||
cur.execute(f"insert into {table} values (generate_series(1, {count}), %s)", (value,))
|
||||
|
||||
with zenbenchmark.record_duration("upload"):
|
||||
with zenbenchmark.record_duration("ingest"):
|
||||
with ThreadPoolExecutor(max_workers=CONCURRENCY) as pool:
|
||||
for i in range(CONCURRENCY):
|
||||
# Write a random value for all rows. This is sufficient to prevent compression,
|
||||
# e.g. in TOAST. Randomly generating every row is too slow.
|
||||
value = random.randbytes(size)
|
||||
worker_rows = rows / CONCURRENCY
|
||||
pool.submit(insert_rows, endpoint, f"table{i}", worker_rows, value)
|
||||
|
||||
end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
|
||||
# Wait for pageserver to ingest the WAL.
|
||||
client = env.pageserver.http_client()
|
||||
wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn)
|
||||
|
||||
# Wait for pageserver S3 upload. Checkpoint to flush the last in-memory layer.
|
||||
client.timeline_checkpoint(
|
||||
env.initial_tenant,
|
||||
env.initial_timeline,
|
||||
compact=False,
|
||||
wait_until_flushed=False,
|
||||
)
|
||||
wait_for_upload(client, env.initial_tenant, env.initial_timeline, end_lsn, timeout=600)
|
||||
|
||||
# Empty out upload queue for next benchmark.
|
||||
wait_for_upload_queue_empty(client, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
backpressure_time = endpoint.safe_psql("select backpressure_throttling_time()")[0][0]
|
||||
|
||||
# Now that all data is ingested, delete and recreate the tenant in the pageserver. This will
|
||||
# reingest all the WAL directly from the safekeeper. This gives us a baseline of how fast the
|
||||
# pageserver can ingest this WAL in isolation.
|
||||
status = env.storage_controller.inspect(tenant_shard_id=env.initial_tenant)
|
||||
assert status is not None
|
||||
|
||||
endpoint.stop() # avoid spurious getpage errors
|
||||
client.tenant_delete(env.initial_tenant)
|
||||
env.pageserver.tenant_create(tenant_id=env.initial_tenant, generation=status[0])
|
||||
|
||||
with zenbenchmark.record_duration("recover"):
|
||||
log.info("Recovering WAL into pageserver")
|
||||
client.timeline_create(env.pg_version, env.initial_tenant, env.initial_timeline)
|
||||
wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn)
|
||||
|
||||
# Emit metrics.
|
||||
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))
|
||||
zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM)
|
||||
zenbenchmark.record("row_count", rows, "rows", MetricReport.TEST_PARAM)
|
||||
zenbenchmark.record("concurrency", CONCURRENCY, "clients", MetricReport.TEST_PARAM)
|
||||
zenbenchmark.record(
|
||||
"backpressure_time", backpressure_time // 1000, "ms", MetricReport.LOWER_IS_BETTER
|
||||
)
|
||||
|
||||
props = {p["name"]: p["value"] for _, p in request.node.user_properties}
|
||||
for name in ("ingest", "upload", "recover"):
|
||||
throughput = int(wal_written_mb / props[name])
|
||||
zenbenchmark.record(f"{name}_throughput", throughput, "MB/s", MetricReport.HIGHER_IS_BETTER)
|
||||
|
||||
# Pageserver shutdown will likely get stuck on the upload queue, just shut it down immediately.
|
||||
env.stop(immediate=True)
|
||||
@@ -1,12 +1,16 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import random
|
||||
import time
|
||||
from asyncio import TaskGroup
|
||||
|
||||
import psycopg2.errors
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
|
||||
from fixtures.pageserver_mitm import BreakConnectionException, PageserverProxy
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
@@ -80,3 +84,197 @@ def test_compute_pageserver_connection_stress(neon_env_builder: NeonEnvBuilder):
|
||||
# do a graceful shutdown which would had caught the allowed_errors before
|
||||
# https://github.com/neondatabase/neon/pull/8632
|
||||
env.pageserver.stop()
|
||||
|
||||
|
||||
#
|
||||
# Observations:
|
||||
#
|
||||
# 1. If the backend is waiting for response to GetPage request, and the client disconnects,
|
||||
# the backend will not immediately abort the GetPage request. It will not notice that the
|
||||
# client is gone, until it tries to send something back to the client, or if a timeout
|
||||
# kills the query.
|
||||
#
|
||||
# So to reproduce the traffic jam, you need:
|
||||
#
|
||||
# - A network glitch, which causes one GetPage request/response to be lost or delayed.
|
||||
# It might get lost at IP level, and TCP retransmits might take a long time. Or there might
|
||||
# be a glitch in the pageserver or compute, which causes the request to be "stuck".
|
||||
#
|
||||
# - An application with a application level timeout and retry. If the
|
||||
# query doesn't return in a timely a fashion, the application kills the connection and
|
||||
# retries the query, or a runs similar query that needs the same page.
|
||||
#
|
||||
# The first time the GetPage request is stuck and it disconnects, it leaves behind a
|
||||
# backend that's still waiting for the GetPage response, and is holding the buffer lock.
|
||||
# The client has closed the connection, but the server doesn't get the memo.
|
||||
# On each subsequent retry, the connection will block waiting for the buffer lock, give up,
|
||||
# and leave behind another backend blocked indefinitely.
|
||||
#
|
||||
# The situation unravels when the original backend doing the GetPage request finally
|
||||
# gets a response, or it gets confirmation that the TCP connection is lost.
|
||||
#
|
||||
# This test reproduces the traffic jam using a MITM proxy between pageserver and compute,
|
||||
# and forcing one GetPage request to get stuck.
|
||||
#
|
||||
# Recommendations:
|
||||
# - set client_connection_check_interval = '10s'. This makes Postgres wake up and check
|
||||
# for client connection loss. It's not perfect, it might not notice if the client has
|
||||
# e.g rebooted without sending a RST packet, but there's no downside
|
||||
#
|
||||
# - Add a timeout to GetPage requests. If no response is received from the pageserver
|
||||
# in, say, 10 s, terminate the pageserver connection and retry. XXX: Negotiate this
|
||||
# behavior with the storage team
|
||||
#
|
||||
#
|
||||
@pytest.mark.timeout(120)
|
||||
def test_compute_pageserver_connection_stress2(
|
||||
neon_env_builder: NeonEnvBuilder, port_distributor: PortDistributor, pg_bin: PgBin
|
||||
):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Set up the MITM proxy
|
||||
|
||||
global error_fraction
|
||||
global delay_fraction
|
||||
|
||||
error_fraction = 0
|
||||
delay_fraction = 0
|
||||
|
||||
async def response_cb(conn_id):
|
||||
global delay_fraction
|
||||
global error_fraction
|
||||
|
||||
if random.random() < error_fraction:
|
||||
raise BreakConnectionException("unlucky")
|
||||
|
||||
orig_delay_fraction = delay_fraction
|
||||
if random.random() < delay_fraction:
|
||||
delay_fraction = 0
|
||||
log.info(f"[{conn_id}] making getpage request STUCK")
|
||||
try:
|
||||
await asyncio.sleep(300)
|
||||
finally:
|
||||
delay_fraction = orig_delay_fraction
|
||||
log.info(f"[{conn_id}] delay finished")
|
||||
|
||||
mitm_listen_port = port_distributor.get_port()
|
||||
mitm = PageserverProxy(mitm_listen_port, env.pageserver.service_port.pg, response_cb)
|
||||
|
||||
def main():
|
||||
global error_fraction, delay_fraction
|
||||
endpoint = env.endpoints.create(
|
||||
"main",
|
||||
config_lines=[
|
||||
"max_connections=1000",
|
||||
"shared_buffers=8MB",
|
||||
"log_connections=on",
|
||||
"log_disconnections=on",
|
||||
],
|
||||
)
|
||||
endpoint.start()
|
||||
|
||||
with open(endpoint.pg_data_dir_path() / "postgresql.conf", "a") as conf:
|
||||
conf.write(
|
||||
f"neon.pageserver_connstring='postgres://localhost:{mitm_listen_port}' # MITM proxy\n"
|
||||
)
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
cur.execute("select pg_reload_conf()")
|
||||
|
||||
scale = 5
|
||||
connstr = endpoint.connstr()
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
|
||||
error_fraction = 0.001
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-i", "-I", "dtGvp", f"-s{scale}", connstr])
|
||||
error_fraction = 0.00
|
||||
delay_fraction = 0.001
|
||||
|
||||
cur.execute("select max(aid) from pgbench_accounts")
|
||||
num_accounts = 100000 * scale
|
||||
|
||||
num_clients = 200
|
||||
|
||||
app = WalkingApplication(num_accounts, num_clients, endpoint, 1000000)
|
||||
asyncio.run(app.run())
|
||||
|
||||
mitm.shutdown()
|
||||
|
||||
async def mm():
|
||||
await asyncio.gather(asyncio.to_thread(main), mitm.run_server())
|
||||
|
||||
asyncio.run(mm())
|
||||
|
||||
# do a graceful shutdown which would had caught the allowed_errors before
|
||||
# https://github.com/neondatabase/neon/pull/8632
|
||||
env.pageserver.stop()
|
||||
|
||||
|
||||
class WalkingApplication:
|
||||
"""
|
||||
A test application with following characteristics:
|
||||
|
||||
- It performs single-row lookups in pgbench_accounts table, just like pgbench -S
|
||||
|
||||
- Whenever a query takes longer than 10s, the application disconnects, reconnects,
|
||||
and retries the query, with the same parameter. This way, if there's a problem
|
||||
with a single page, the application will keep retrying it rather than work
|
||||
around it.
|
||||
|
||||
- The lookups are not randomly distributed, but form a "walking herd" pattern,
|
||||
where the queries walk through all accounts, with some randomness. This way,
|
||||
there's a lot of locality of access, but the locality moves throughout the
|
||||
table.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, num_accounts, num_clients, endpoint, num_xacts):
|
||||
self.num_accounts = num_accounts
|
||||
self.num_clients = num_clients
|
||||
self.endpoint = endpoint
|
||||
self.running = True
|
||||
self.num_xacts = num_xacts
|
||||
|
||||
self.xacts_started = 0
|
||||
self.xacts_performed = 0
|
||||
self.xacts_failed = 0
|
||||
|
||||
async def run(self):
|
||||
async with TaskGroup() as group:
|
||||
for i in range(1, self.num_clients):
|
||||
group.create_task(self.walking_client(i))
|
||||
|
||||
async def walking_client(self, client_id):
|
||||
local_xacts_performed = 0
|
||||
|
||||
conn = None
|
||||
stmt = None
|
||||
failed = False
|
||||
while self.running and self.xacts_started < self.num_xacts:
|
||||
self.xacts_started += 1
|
||||
if not failed:
|
||||
aid = (self.xacts_started * 100 + random.randint(0, 100)) % self.num_accounts + 1
|
||||
|
||||
if conn is None:
|
||||
conn = await self.endpoint.connect_async()
|
||||
await conn.execute("set statement_timeout=0")
|
||||
stmt = await conn.prepare("SELECT abalance FROM pgbench_accounts WHERE aid = $1")
|
||||
|
||||
try:
|
||||
async with asyncio.timeout(10):
|
||||
res = await stmt.fetchval(aid)
|
||||
if local_xacts_performed % 1000 == 0:
|
||||
log.info(
|
||||
f"[{client_id}] result {self.xacts_performed}/{self.num_xacts}: balance of account {aid}: {res}"
|
||||
)
|
||||
self.xacts_performed += 1
|
||||
local_xacts_performed += 1
|
||||
failed = False
|
||||
except TimeoutError:
|
||||
log.info(f"[{client_id}] query on aid {aid} timed out. Reconnecting")
|
||||
conn.terminate()
|
||||
conn = None
|
||||
failed = True
|
||||
|
||||
@@ -15,7 +15,7 @@ from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.utils import skip_in_debug_build, wait_until
|
||||
from fixtures.workload import Workload
|
||||
|
||||
AGGRESSIVE_COMPACTION_TENANT_CONF = {
|
||||
AGGRESIVE_COMPACTION_TENANT_CONF = {
|
||||
# Disable gc and compaction. The test runs compaction manually.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
@@ -24,7 +24,6 @@ AGGRESSIVE_COMPACTION_TENANT_CONF = {
|
||||
# Compact small layers
|
||||
"compaction_target_size": 1024**2,
|
||||
"image_creation_threshold": 2,
|
||||
# "lsn_lease_length": "0s", -- TODO: would cause branch creation errors, should fix later
|
||||
}
|
||||
|
||||
|
||||
@@ -52,7 +51,7 @@ def test_pageserver_compaction_smoke(
|
||||
page_cache_size=10
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESSIVE_COMPACTION_TENANT_CONF)
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
@@ -121,25 +120,14 @@ page_cache_size=10
|
||||
assert vectored_average < 8
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
SMOKE_CONF = {
|
||||
# Run both gc and gc-compaction.
|
||||
"gc_period": "5s",
|
||||
"compaction_period": "5s",
|
||||
# No PiTR interval and small GC horizon
|
||||
"pitr_interval": "0s",
|
||||
"gc_horizon": f"{1024 ** 2}",
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=SMOKE_CONF)
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
row_count = 10000
|
||||
churn_rounds = 50
|
||||
row_count = 1000
|
||||
churn_rounds = 10
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
@@ -153,35 +141,23 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
if i % 10 == 0:
|
||||
log.info(f"Running churn round {i}/{churn_rounds} ...")
|
||||
|
||||
# Run gc-compaction every 10 rounds to ensure the test doesn't take too long time.
|
||||
ps_http.timeline_compact(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
enhanced_gc_bottom_most_compaction=True,
|
||||
body={
|
||||
"scheduled": True,
|
||||
"sub_compaction": True,
|
||||
"compact_range": {
|
||||
"start": "000000000000000000000000000000000000",
|
||||
"end": "030000000000000000000000000000000000",
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
workload.churn_rows(row_count, env.pageserver.id)
|
||||
|
||||
# ensure gc_compaction is scheduled and it's actually running (instead of skipping due to no layers picked)
|
||||
env.pageserver.assert_log_contains(
|
||||
"scheduled_compact_timeline.*picked .* layers for compaction"
|
||||
)
|
||||
# Force L0 compaction to ensure the number of layers is within bounds, so that gc-compaction can run.
|
||||
ps_http.timeline_compact(tenant_id, timeline_id, force_l0_compaction=True)
|
||||
assert ps_http.perf_info(tenant_id, timeline_id)[0]["num_of_l0"] <= 1
|
||||
ps_http.timeline_compact(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
enhanced_gc_bottom_most_compaction=True,
|
||||
body={
|
||||
"start": "000000000000000000000000000000000000",
|
||||
"end": "030000000000000000000000000000000000",
|
||||
},
|
||||
)
|
||||
|
||||
log.info("Validating at workload end ...")
|
||||
workload.validate(env.pageserver.id)
|
||||
|
||||
# Run a legacy compaction+gc to ensure gc-compaction can coexist with legacy compaction.
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)
|
||||
ps_http.timeline_gc(tenant_id, timeline_id, None)
|
||||
|
||||
|
||||
# Stripe sizes in number of pages.
|
||||
TINY_STRIPES = 16
|
||||
|
||||
@@ -215,7 +215,7 @@ if SQL_EXPORTER is None:
|
||||
#
|
||||
# The "host" network mode allows sql_exporter to talk to the
|
||||
# endpoint which is running on the host.
|
||||
super().__init__("docker.io/burningalchemist/sql_exporter:0.16.0", network_mode="host")
|
||||
super().__init__("docker.io/burningalchemist/sql_exporter:0.13.1", network_mode="host")
|
||||
|
||||
self.__logs_dir = logs_dir
|
||||
self.__port = port
|
||||
|
||||
@@ -3230,55 +3230,3 @@ def test_multi_attached_timeline_creation(neon_env_builder: NeonEnvBuilder, migr
|
||||
# Always disable 'pause' failpoints, even on failure, to avoid hanging in shutdown
|
||||
env.storage_controller.configure_failpoints((migration_failpoint.value, "off"))
|
||||
raise
|
||||
|
||||
|
||||
@run_only_on_default_postgres("Postgres version makes no difference here")
|
||||
def test_storage_controller_detached_stopped(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Test that detaching a tenant while it has scheduling policy set to Paused or Stop works
|
||||
"""
|
||||
|
||||
remote_storage_kind = s3_storage()
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
neon_env_builder.num_pageservers = 1
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
env.storage_controller.tenant_create(
|
||||
tenant_id,
|
||||
shard_count=1,
|
||||
)
|
||||
|
||||
assert len(env.pageserver.http_client().tenant_list_locations()["tenant_shards"]) == 1
|
||||
|
||||
# Disable scheduling: ordinarily this would prevent the tenant's configuration being
|
||||
# reconciled to pageservers, but this should be overridden when detaching.
|
||||
env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy.*")
|
||||
env.storage_controller.tenant_policy_update(
|
||||
tenant_id,
|
||||
{"scheduling": "Stop"},
|
||||
)
|
||||
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
# Detach the tenant
|
||||
virtual_ps_http.tenant_location_conf(
|
||||
tenant_id,
|
||||
{
|
||||
"mode": "Detached",
|
||||
"secondary_conf": None,
|
||||
"tenant_conf": {},
|
||||
"generation": None,
|
||||
},
|
||||
)
|
||||
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
# Confirm the detach happened
|
||||
assert env.pageserver.http_client().tenant_list_locations()["tenant_shards"] == []
|
||||
|
||||
@@ -4,7 +4,7 @@ import time
|
||||
from contextlib import closing
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin, fork_at_current_lsn
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, fork_at_current_lsn
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
@@ -292,76 +292,3 @@ def test_vm_bit_clear_on_heap_lock_blackbox(neon_env_builder: NeonEnvBuilder):
|
||||
tup = cur.fetchall()
|
||||
log.info(f"tuple = {tup}")
|
||||
cur.execute("commit transaction")
|
||||
|
||||
|
||||
def test_check_visibility_map(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
"""
|
||||
Runs pgbench across a few databases on a sharded tenant, then performs a visibility map
|
||||
consistency check. Regression test for https://github.com/neondatabase/neon/issues/9914.
|
||||
"""
|
||||
|
||||
# Use a large number of shards with small stripe sizes, to ensure the visibility
|
||||
# map will end up on non-zero shards.
|
||||
SHARD_COUNT = 8
|
||||
STRIPE_SIZE = 32 # in 8KB pages
|
||||
PGBENCH_RUNS = 4
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_shard_count=SHARD_COUNT, initial_tenant_shard_stripe_size=STRIPE_SIZE
|
||||
)
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
"shared_buffers = 64MB",
|
||||
],
|
||||
)
|
||||
|
||||
# Run pgbench in 4 different databases, to exercise different shards.
|
||||
dbnames = [f"pgbench{i}" for i in range(PGBENCH_RUNS)]
|
||||
for i, dbname in enumerate(dbnames):
|
||||
log.info(f"pgbench run {i+1}/{PGBENCH_RUNS}")
|
||||
endpoint.safe_psql(f"create database {dbname}")
|
||||
connstr = endpoint.connstr(dbname=dbname)
|
||||
# pgbench -i will automatically vacuum the tables. This creates the visibility map.
|
||||
pg_bin.run(["pgbench", "-i", "-s", "10", connstr])
|
||||
# Freeze the tuples to set the initial frozen bit.
|
||||
endpoint.safe_psql("vacuum freeze", dbname=dbname)
|
||||
# Run pgbench.
|
||||
pg_bin.run(["pgbench", "-c", "32", "-j", "8", "-T", "10", connstr])
|
||||
|
||||
# Restart the endpoint to flush the compute page cache. We want to make sure we read VM pages
|
||||
# from storage, not cache.
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
|
||||
# Check that the visibility map matches the heap contents for pg_accounts (the main table).
|
||||
for dbname in dbnames:
|
||||
log.info(f"Checking visibility map for {dbname}")
|
||||
with endpoint.cursor(dbname=dbname) as cur:
|
||||
cur.execute("create extension pg_visibility")
|
||||
|
||||
cur.execute("select count(*) from pg_check_visible('pgbench_accounts')")
|
||||
row = cur.fetchone()
|
||||
assert row is not None
|
||||
assert row[0] == 0, f"{row[0]} inconsistent VM pages (visible)"
|
||||
|
||||
cur.execute("select count(*) from pg_check_frozen('pgbench_accounts')")
|
||||
row = cur.fetchone()
|
||||
assert row is not None
|
||||
assert row[0] == 0, f"{row[0]} inconsistent VM pages (frozen)"
|
||||
|
||||
# Vacuum and freeze the tables, and check that the visibility map is still accurate.
|
||||
for dbname in dbnames:
|
||||
log.info(f"Vacuuming and checking visibility map for {dbname}")
|
||||
with endpoint.cursor(dbname=dbname) as cur:
|
||||
cur.execute("vacuum freeze")
|
||||
|
||||
cur.execute("select count(*) from pg_check_visible('pgbench_accounts')")
|
||||
row = cur.fetchone()
|
||||
assert row is not None
|
||||
assert row[0] == 0, f"{row[0]} inconsistent VM pages (visible)"
|
||||
|
||||
cur.execute("select count(*) from pg_check_frozen('pgbench_accounts')")
|
||||
row = cur.fetchone()
|
||||
assert row is not None
|
||||
assert row[0] == 0, f"{row[0]} inconsistent VM pages (frozen)"
|
||||
|
||||
@@ -33,7 +33,6 @@ deranged = { version = "0.3", default-features = false, features = ["powerfmt",
|
||||
digest = { version = "0.10", features = ["mac", "oid", "std"] }
|
||||
either = { version = "1" }
|
||||
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
||||
form_urlencoded = { version = "1" }
|
||||
futures-channel = { version = "0.3", features = ["sink"] }
|
||||
futures-executor = { version = "0.3" }
|
||||
futures-io = { version = "0.3" }
|
||||
@@ -79,7 +78,6 @@ sha2 = { version = "0.10", features = ["asm", "oid"] }
|
||||
signature = { version = "2", default-features = false, features = ["digest", "rand_core", "std"] }
|
||||
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
|
||||
spki = { version = "0.7", default-features = false, features = ["pem", "std"] }
|
||||
stable_deref_trait = { version = "1" }
|
||||
subtle = { version = "2" }
|
||||
sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] }
|
||||
tikv-jemalloc-ctl = { version = "0.6", features = ["stats", "use_std"] }
|
||||
@@ -107,7 +105,6 @@ anyhow = { version = "1", features = ["backtrace"] }
|
||||
bytes = { version = "1", features = ["serde"] }
|
||||
cc = { version = "1", default-features = false, features = ["parallel"] }
|
||||
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
|
||||
displaydoc = { version = "0.2" }
|
||||
either = { version = "1" }
|
||||
getrandom = { version = "0.2", default-features = false, features = ["std"] }
|
||||
half = { version = "2", default-features = false, features = ["num-traits"] }
|
||||
|
||||
Reference in New Issue
Block a user