mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-07 18:30:37 +00:00
Compare commits
16 Commits
conrad/pro
...
arpad/blob
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
229157e323 | ||
|
|
b593e51eae | ||
|
|
4c4cb80186 | ||
|
|
92273b6d5e | ||
|
|
e74e7aac93 | ||
|
|
4cca5cdb12 | ||
|
|
9d425b54f7 | ||
|
|
ec790870d5 | ||
|
|
4d7111f240 | ||
|
|
b1fd086c0c | ||
|
|
b6eea65597 | ||
|
|
c42c28b339 | ||
|
|
e4837b0a5a | ||
|
|
14c4fae64a | ||
|
|
cc70fc802d | ||
|
|
fa07097f2f |
2
.github/actionlint.yml
vendored
2
.github/actionlint.yml
vendored
@@ -21,3 +21,5 @@ config-variables:
|
|||||||
- SLACK_UPCOMING_RELEASE_CHANNEL_ID
|
- SLACK_UPCOMING_RELEASE_CHANNEL_ID
|
||||||
- DEV_AWS_OIDC_ROLE_ARN
|
- DEV_AWS_OIDC_ROLE_ARN
|
||||||
- BENCHMARK_INGEST_TARGET_PROJECTID
|
- BENCHMARK_INGEST_TARGET_PROJECTID
|
||||||
|
- PGREGRESS_PG16_PROJECT_ID
|
||||||
|
- PGREGRESS_PG17_PROJECT_ID
|
||||||
|
|||||||
32
.github/workflows/cloud-regress.yml
vendored
32
.github/workflows/cloud-regress.yml
vendored
@@ -23,11 +23,14 @@ jobs:
|
|||||||
regress:
|
regress:
|
||||||
env:
|
env:
|
||||||
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
|
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
|
||||||
DEFAULT_PG_VERSION: 16
|
|
||||||
TEST_OUTPUT: /tmp/test_output
|
TEST_OUTPUT: /tmp/test_output
|
||||||
BUILD_TYPE: remote
|
BUILD_TYPE: remote
|
||||||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
|
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
|
||||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
|
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
|
||||||
|
strategy:
|
||||||
|
fail-fast: false
|
||||||
|
matrix:
|
||||||
|
pg-version: [16, 17]
|
||||||
|
|
||||||
runs-on: us-east-2
|
runs-on: us-east-2
|
||||||
container:
|
container:
|
||||||
@@ -40,9 +43,11 @@ jobs:
|
|||||||
submodules: true
|
submodules: true
|
||||||
|
|
||||||
- name: Patch the test
|
- name: Patch the test
|
||||||
|
env:
|
||||||
|
PG_VERSION: ${{matrix.pg-version}}
|
||||||
run: |
|
run: |
|
||||||
cd "vendor/postgres-v${DEFAULT_PG_VERSION}"
|
cd "vendor/postgres-v${PG_VERSION}"
|
||||||
patch -p1 < "../../compute/patches/cloud_regress_pg${DEFAULT_PG_VERSION}.patch"
|
patch -p1 < "../../compute/patches/cloud_regress_pg${PG_VERSION}.patch"
|
||||||
|
|
||||||
- name: Generate a random password
|
- name: Generate a random password
|
||||||
id: pwgen
|
id: pwgen
|
||||||
@@ -55,8 +60,9 @@ jobs:
|
|||||||
- name: Change tests according to the generated password
|
- name: Change tests according to the generated password
|
||||||
env:
|
env:
|
||||||
DBPASS: ${{ steps.pwgen.outputs.DBPASS }}
|
DBPASS: ${{ steps.pwgen.outputs.DBPASS }}
|
||||||
|
PG_VERSION: ${{matrix.pg-version}}
|
||||||
run: |
|
run: |
|
||||||
cd vendor/postgres-v"${DEFAULT_PG_VERSION}"/src/test/regress
|
cd vendor/postgres-v"${PG_VERSION}"/src/test/regress
|
||||||
for fname in sql/*.sql expected/*.out; do
|
for fname in sql/*.sql expected/*.out; do
|
||||||
sed -i.bak s/NEON_PASSWORD_PLACEHOLDER/"'${DBPASS}'"/ "${fname}"
|
sed -i.bak s/NEON_PASSWORD_PLACEHOLDER/"'${DBPASS}'"/ "${fname}"
|
||||||
done
|
done
|
||||||
@@ -73,15 +79,29 @@ jobs:
|
|||||||
path: /tmp/neon/
|
path: /tmp/neon/
|
||||||
prefix: latest
|
prefix: latest
|
||||||
|
|
||||||
|
- name: Create a new branch
|
||||||
|
id: create-branch
|
||||||
|
uses: ./.github/actions/neon-branch-create
|
||||||
|
with:
|
||||||
|
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||||
|
project_id: ${{ vars[format('PGREGRESS_PG{0}_PROJECT_ID', matrix.pg-version)] }}
|
||||||
|
|
||||||
- name: Run the regression tests
|
- name: Run the regression tests
|
||||||
uses: ./.github/actions/run-python-test-set
|
uses: ./.github/actions/run-python-test-set
|
||||||
with:
|
with:
|
||||||
build_type: ${{ env.BUILD_TYPE }}
|
build_type: ${{ env.BUILD_TYPE }}
|
||||||
test_selection: cloud_regress
|
test_selection: cloud_regress
|
||||||
pg_version: ${{ env.DEFAULT_PG_VERSION }}
|
pg_version: ${{matrix.pg-version}}
|
||||||
extra_params: -m remote_cluster
|
extra_params: -m remote_cluster
|
||||||
env:
|
env:
|
||||||
BENCHMARK_CONNSTR: ${{ secrets.PG_REGRESS_CONNSTR }}
|
BENCHMARK_CONNSTR: ${{steps.create-branch.outputs.dsn}}
|
||||||
|
|
||||||
|
- name: Delete branch
|
||||||
|
uses: ./.github/actions/neon-branch-delete
|
||||||
|
with:
|
||||||
|
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||||
|
project_id: ${{ vars[format('PGREGRESS_PG{0}_PROJECT_ID', matrix.pg-version)] }}
|
||||||
|
branch_id: ${{steps.create-branch.outputs.branch_id}}
|
||||||
|
|
||||||
- name: Create Allure report
|
- name: Create Allure report
|
||||||
id: create-allure-report
|
id: create-allure-report
|
||||||
|
|||||||
33
CODEOWNERS
33
CODEOWNERS
@@ -1,16 +1,29 @@
|
|||||||
/.github/ @neondatabase/developer-productivity
|
# Autoscaling
|
||||||
/compute_tools/ @neondatabase/control-plane @neondatabase/compute
|
|
||||||
/libs/pageserver_api/ @neondatabase/storage
|
|
||||||
/libs/postgres_ffi/ @neondatabase/compute @neondatabase/storage
|
|
||||||
/libs/proxy/ @neondatabase/proxy
|
|
||||||
/libs/remote_storage/ @neondatabase/storage
|
|
||||||
/libs/safekeeper_api/ @neondatabase/storage
|
|
||||||
/libs/vm_monitor/ @neondatabase/autoscaling
|
/libs/vm_monitor/ @neondatabase/autoscaling
|
||||||
/pageserver/ @neondatabase/storage
|
|
||||||
|
# DevProd
|
||||||
|
/.github/ @neondatabase/developer-productivity
|
||||||
|
|
||||||
|
# Compute
|
||||||
/pgxn/ @neondatabase/compute
|
/pgxn/ @neondatabase/compute
|
||||||
/pgxn/neon/ @neondatabase/compute @neondatabase/storage
|
/vendor/ @neondatabase/compute
|
||||||
|
/compute/ @neondatabase/compute
|
||||||
|
/compute_tools/ @neondatabase/compute
|
||||||
|
|
||||||
|
# Proxy
|
||||||
|
/libs/proxy/ @neondatabase/proxy
|
||||||
/proxy/ @neondatabase/proxy
|
/proxy/ @neondatabase/proxy
|
||||||
|
|
||||||
|
# Storage
|
||||||
|
/pageserver/ @neondatabase/storage
|
||||||
/safekeeper/ @neondatabase/storage
|
/safekeeper/ @neondatabase/storage
|
||||||
/storage_controller @neondatabase/storage
|
/storage_controller @neondatabase/storage
|
||||||
/storage_scrubber @neondatabase/storage
|
/storage_scrubber @neondatabase/storage
|
||||||
/vendor/ @neondatabase/compute
|
/libs/pageserver_api/ @neondatabase/storage
|
||||||
|
/libs/remote_storage/ @neondatabase/storage
|
||||||
|
/libs/safekeeper_api/ @neondatabase/storage
|
||||||
|
|
||||||
|
# Shared
|
||||||
|
/pgxn/neon/ @neondatabase/compute @neondatabase/storage
|
||||||
|
/libs/compute_api/ @neondatabase/compute @neondatabase/control-plane
|
||||||
|
/libs/postgres_ffi/ @neondatabase/compute @neondatabase/storage
|
||||||
|
|||||||
515
Cargo.lock
generated
515
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
10
Cargo.toml
10
Cargo.toml
@@ -51,10 +51,6 @@ anyhow = { version = "1.0", features = ["backtrace"] }
|
|||||||
arc-swap = "1.6"
|
arc-swap = "1.6"
|
||||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
|
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
|
||||||
atomic-take = "1.1.0"
|
atomic-take = "1.1.0"
|
||||||
azure_core = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] }
|
|
||||||
azure_identity = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
|
|
||||||
azure_storage = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
|
|
||||||
azure_storage_blobs = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
|
|
||||||
flate2 = "1.0.26"
|
flate2 = "1.0.26"
|
||||||
async-stream = "0.3"
|
async-stream = "0.3"
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
@@ -216,6 +212,12 @@ postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git",
|
|||||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||||
|
|
||||||
|
## Azure SDK crates
|
||||||
|
azure_core = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "arpad/blob_batch", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] }
|
||||||
|
azure_identity = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "arpad/blob_batch", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||||
|
azure_storage = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "arpad/blob_batch", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||||
|
azure_storage_blobs = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "arpad/blob_batch", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||||
|
|
||||||
## Local libraries
|
## Local libraries
|
||||||
compute_api = { version = "0.1", path = "./libs/compute_api/" }
|
compute_api = { version = "0.1", path = "./libs/compute_api/" }
|
||||||
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
|
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
|
||||||
|
|||||||
@@ -115,7 +115,7 @@ RUN set -e \
|
|||||||
|
|
||||||
# Keep the version the same as in compute/compute-node.Dockerfile and
|
# Keep the version the same as in compute/compute-node.Dockerfile and
|
||||||
# test_runner/regress/test_compute_metrics.py.
|
# test_runner/regress/test_compute_metrics.py.
|
||||||
ENV SQL_EXPORTER_VERSION=0.13.1
|
ENV SQL_EXPORTER_VERSION=0.16.0
|
||||||
RUN curl -fsSL \
|
RUN curl -fsSL \
|
||||||
"https://github.com/burningalchemist/sql_exporter/releases/download/${SQL_EXPORTER_VERSION}/sql_exporter-${SQL_EXPORTER_VERSION}.linux-$(case "$(uname -m)" in x86_64) echo amd64;; aarch64) echo arm64;; esac).tar.gz" \
|
"https://github.com/burningalchemist/sql_exporter/releases/download/${SQL_EXPORTER_VERSION}/sql_exporter-${SQL_EXPORTER_VERSION}.linux-$(case "$(uname -m)" in x86_64) echo amd64;; aarch64) echo arm64;; esac).tar.gz" \
|
||||||
--output sql_exporter.tar.gz \
|
--output sql_exporter.tar.gz \
|
||||||
|
|||||||
@@ -1324,7 +1324,7 @@ FROM quay.io/prometheuscommunity/postgres-exporter:v0.12.1 AS postgres-exporter
|
|||||||
|
|
||||||
# Keep the version the same as in build-tools.Dockerfile and
|
# Keep the version the same as in build-tools.Dockerfile and
|
||||||
# test_runner/regress/test_compute_metrics.py.
|
# test_runner/regress/test_compute_metrics.py.
|
||||||
FROM burningalchemist/sql_exporter:0.13.1 AS sql-exporter
|
FROM burningalchemist/sql_exporter:0.16.0 AS sql-exporter
|
||||||
|
|
||||||
#########################################################################################
|
#########################################################################################
|
||||||
#
|
#
|
||||||
|
|||||||
4047
compute/patches/cloud_regress_pg17.patch
Normal file
4047
compute/patches/cloud_regress_pg17.patch
Normal file
File diff suppressed because it is too large
Load Diff
@@ -42,6 +42,7 @@ allow = [
|
|||||||
"MPL-2.0",
|
"MPL-2.0",
|
||||||
"OpenSSL",
|
"OpenSSL",
|
||||||
"Unicode-DFS-2016",
|
"Unicode-DFS-2016",
|
||||||
|
"Unicode-3.0",
|
||||||
]
|
]
|
||||||
confidence-threshold = 0.8
|
confidence-threshold = 0.8
|
||||||
exceptions = [
|
exceptions = [
|
||||||
|
|||||||
@@ -245,6 +245,17 @@ impl From<NodeAvailability> for NodeAvailabilityWrapper {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Scheduling policy enables us to selectively disable some automatic actions that the
|
||||||
|
/// controller performs on a tenant shard. This is only set to a non-default value by
|
||||||
|
/// human intervention, and it is reset to the default value (Active) when the tenant's
|
||||||
|
/// placement policy is modified away from Attached.
|
||||||
|
///
|
||||||
|
/// The typical use of a non-Active scheduling policy is one of:
|
||||||
|
/// - Pinnning a shard to a node (i.e. migrating it there & setting a non-Active scheduling policy)
|
||||||
|
/// - Working around a bug (e.g. if something is flapping and we need to stop it until the bug is fixed)
|
||||||
|
///
|
||||||
|
/// If you're not sure which policy to use to pin a shard to its current location, you probably
|
||||||
|
/// want Pause.
|
||||||
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
|
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
|
||||||
pub enum ShardSchedulingPolicy {
|
pub enum ShardSchedulingPolicy {
|
||||||
// Normal mode: the tenant's scheduled locations may be updated at will, including
|
// Normal mode: the tenant's scheduled locations may be updated at will, including
|
||||||
|
|||||||
@@ -8,15 +8,14 @@ use std::io;
|
|||||||
use std::num::NonZeroU32;
|
use std::num::NonZeroU32;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
|
|
||||||
use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
|
use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
|
||||||
|
use anyhow::Context;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
|
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
|
||||||
use azure_core::{Continuable, RetryOptions};
|
use azure_core::{Continuable, RetryOptions};
|
||||||
use azure_identity::DefaultAzureCredential;
|
|
||||||
use azure_storage::StorageCredentials;
|
use azure_storage::StorageCredentials;
|
||||||
use azure_storage_blobs::blob::CopyStatus;
|
use azure_storage_blobs::blob::CopyStatus;
|
||||||
use azure_storage_blobs::prelude::ClientBuilder;
|
use azure_storage_blobs::prelude::ClientBuilder;
|
||||||
@@ -76,8 +75,9 @@ impl AzureBlobStorage {
|
|||||||
let credentials = if let Ok(access_key) = env::var("AZURE_STORAGE_ACCESS_KEY") {
|
let credentials = if let Ok(access_key) = env::var("AZURE_STORAGE_ACCESS_KEY") {
|
||||||
StorageCredentials::access_key(account.clone(), access_key)
|
StorageCredentials::access_key(account.clone(), access_key)
|
||||||
} else {
|
} else {
|
||||||
let token_credential = DefaultAzureCredential::default();
|
let token_credential = azure_identity::create_default_credential()
|
||||||
StorageCredentials::token_credential(Arc::new(token_credential))
|
.context("trying to obtain Azure default credentials")?;
|
||||||
|
StorageCredentials::token_credential(token_credential)
|
||||||
};
|
};
|
||||||
|
|
||||||
// we have an outer retry
|
// we have an outer retry
|
||||||
@@ -556,7 +556,7 @@ impl RemoteStorage for AzureBlobStorage {
|
|||||||
let op = async {
|
let op = async {
|
||||||
// TODO batch requests are not supported by the SDK
|
// TODO batch requests are not supported by the SDK
|
||||||
// https://github.com/Azure/azure-sdk-for-rust/issues/1068
|
// https://github.com/Azure/azure-sdk-for-rust/issues/1068
|
||||||
for path in paths {
|
for path_chunk in paths.chunks(256) {
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum AzureOrTimeout {
|
enum AzureOrTimeout {
|
||||||
AzureError(azure_core::Error),
|
AzureError(azure_core::Error),
|
||||||
@@ -572,13 +572,20 @@ impl RemoteStorage for AzureBlobStorage {
|
|||||||
let max_retries = 5;
|
let max_retries = 5;
|
||||||
backoff::retry(
|
backoff::retry(
|
||||||
|| async {
|
|| async {
|
||||||
let blob_client = self.client.blob_client(self.relative_path_to_name(path));
|
let mut batch_client = self.client.blob_batch();
|
||||||
|
for path in path_chunk {
|
||||||
|
batch_client = match batch_client.delete(self.relative_path_to_name(path)) {
|
||||||
|
Ok(batch_client) => batch_client,
|
||||||
|
Err(e) => return Err(AzureOrTimeout::AzureError(e)),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
let request = blob_client.delete().into_future();
|
let request = batch_client.into_future();
|
||||||
|
|
||||||
let res = tokio::time::timeout(self.timeout, request).await;
|
let res = tokio::time::timeout(self.timeout, request).await;
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
|
// TODO: validate that all deletions were successful
|
||||||
Ok(Ok(_v)) => Ok(()),
|
Ok(Ok(_v)) => Ok(()),
|
||||||
Ok(Err(azure_err)) => {
|
Ok(Err(azure_err)) => {
|
||||||
if let Some(http_err) = azure_err.as_http_error() {
|
if let Some(http_err) = azure_err.as_http_error() {
|
||||||
|
|||||||
@@ -2036,15 +2036,23 @@ async fn timeline_compact_handler(
|
|||||||
parse_query_param::<_, bool>(&request, "wait_until_scheduled_compaction_done")?
|
parse_query_param::<_, bool>(&request, "wait_until_scheduled_compaction_done")?
|
||||||
.unwrap_or(false);
|
.unwrap_or(false);
|
||||||
|
|
||||||
|
let sub_compaction = compact_request
|
||||||
|
.as_ref()
|
||||||
|
.map(|r| r.sub_compaction)
|
||||||
|
.unwrap_or(false);
|
||||||
let options = CompactOptions {
|
let options = CompactOptions {
|
||||||
compact_range: compact_request
|
compact_range: compact_request
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.and_then(|r| r.compact_range.clone()),
|
.and_then(|r| r.compact_range.clone()),
|
||||||
compact_below_lsn: compact_request.as_ref().and_then(|r| r.compact_below_lsn),
|
compact_below_lsn: compact_request.as_ref().and_then(|r| r.compact_below_lsn),
|
||||||
flags,
|
flags,
|
||||||
|
sub_compaction,
|
||||||
};
|
};
|
||||||
|
|
||||||
let scheduled = compact_request.map(|r| r.scheduled).unwrap_or(false);
|
let scheduled = compact_request
|
||||||
|
.as_ref()
|
||||||
|
.map(|r| r.scheduled)
|
||||||
|
.unwrap_or(false);
|
||||||
|
|
||||||
async {
|
async {
|
||||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||||
@@ -2053,7 +2061,7 @@ async fn timeline_compact_handler(
|
|||||||
let tenant = state
|
let tenant = state
|
||||||
.tenant_manager
|
.tenant_manager
|
||||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||||
let rx = tenant.schedule_compaction(timeline_id, options).await;
|
let rx = tenant.schedule_compaction(timeline_id, options).await.map_err(ApiError::InternalServerError)?;
|
||||||
if wait_until_scheduled_compaction_done {
|
if wait_until_scheduled_compaction_done {
|
||||||
// It is possible that this will take a long time, dropping the HTTP request will not cancel the compaction.
|
// It is possible that this will take a long time, dropping the HTTP request will not cancel the compaction.
|
||||||
rx.await.ok();
|
rx.await.ok();
|
||||||
|
|||||||
@@ -1223,31 +1223,60 @@ pub(crate) mod virtual_file_io_engine {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct SmgrOpTimer {
|
pub(crate) struct SmgrOpTimer(Option<SmgrOpTimerInner>);
|
||||||
|
pub(crate) struct SmgrOpTimerInner {
|
||||||
global_latency_histo: Histogram,
|
global_latency_histo: Histogram,
|
||||||
|
|
||||||
// Optional because not all op types are tracked per-timeline
|
// Optional because not all op types are tracked per-timeline
|
||||||
per_timeline_latency_histo: Option<Histogram>,
|
per_timeline_latency_histo: Option<Histogram>,
|
||||||
|
|
||||||
|
global_flush_in_progress_micros: IntCounter,
|
||||||
|
per_timeline_flush_in_progress_micros: IntCounter,
|
||||||
|
|
||||||
start: Instant,
|
start: Instant,
|
||||||
throttled: Duration,
|
throttled: Duration,
|
||||||
op: SmgrQueryType,
|
op: SmgrQueryType,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) struct SmgrOpFlushInProgress {
|
||||||
|
base: Instant,
|
||||||
|
global_micros: IntCounter,
|
||||||
|
per_timeline_micros: IntCounter,
|
||||||
|
}
|
||||||
|
|
||||||
impl SmgrOpTimer {
|
impl SmgrOpTimer {
|
||||||
pub(crate) fn deduct_throttle(&mut self, throttle: &Option<Duration>) {
|
pub(crate) fn deduct_throttle(&mut self, throttle: &Option<Duration>) {
|
||||||
let Some(throttle) = throttle else {
|
let Some(throttle) = throttle else {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
self.throttled += *throttle;
|
let inner = self.0.as_mut().expect("other public methods consume self");
|
||||||
|
inner.throttled += *throttle;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for SmgrOpTimer {
|
pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> SmgrOpFlushInProgress {
|
||||||
fn drop(&mut self) {
|
let (flush_start, inner) = self
|
||||||
let elapsed = self.start.elapsed();
|
.smgr_op_end()
|
||||||
|
.expect("this method consume self, and the only other caller is drop handler");
|
||||||
|
let SmgrOpTimerInner {
|
||||||
|
global_flush_in_progress_micros,
|
||||||
|
per_timeline_flush_in_progress_micros,
|
||||||
|
..
|
||||||
|
} = inner;
|
||||||
|
SmgrOpFlushInProgress {
|
||||||
|
base: flush_start,
|
||||||
|
global_micros: global_flush_in_progress_micros,
|
||||||
|
per_timeline_micros: per_timeline_flush_in_progress_micros,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let elapsed = match elapsed.checked_sub(self.throttled) {
|
/// Returns `None`` if this method has already been called, `Some` otherwise.
|
||||||
|
fn smgr_op_end(&mut self) -> Option<(Instant, SmgrOpTimerInner)> {
|
||||||
|
let inner = self.0.take()?;
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
let elapsed = now - inner.start;
|
||||||
|
|
||||||
|
let elapsed = match elapsed.checked_sub(inner.throttled) {
|
||||||
Some(elapsed) => elapsed,
|
Some(elapsed) => elapsed,
|
||||||
None => {
|
None => {
|
||||||
use utils::rate_limit::RateLimit;
|
use utils::rate_limit::RateLimit;
|
||||||
@@ -1258,9 +1287,9 @@ impl Drop for SmgrOpTimer {
|
|||||||
})))
|
})))
|
||||||
});
|
});
|
||||||
let mut guard = LOGGED.lock().unwrap();
|
let mut guard = LOGGED.lock().unwrap();
|
||||||
let rate_limit = &mut guard[self.op];
|
let rate_limit = &mut guard[inner.op];
|
||||||
rate_limit.call(|| {
|
rate_limit.call(|| {
|
||||||
warn!(op=?self.op, ?elapsed, ?self.throttled, "implementation error: time spent throttled exceeds total request wall clock time");
|
warn!(op=?inner.op, ?elapsed, ?inner.throttled, "implementation error: time spent throttled exceeds total request wall clock time");
|
||||||
});
|
});
|
||||||
elapsed // un-throttled time, more info than just saturating to 0
|
elapsed // un-throttled time, more info than just saturating to 0
|
||||||
}
|
}
|
||||||
@@ -1268,10 +1297,54 @@ impl Drop for SmgrOpTimer {
|
|||||||
|
|
||||||
let elapsed = elapsed.as_secs_f64();
|
let elapsed = elapsed.as_secs_f64();
|
||||||
|
|
||||||
self.global_latency_histo.observe(elapsed);
|
inner.global_latency_histo.observe(elapsed);
|
||||||
if let Some(per_timeline_getpage_histo) = &self.per_timeline_latency_histo {
|
if let Some(per_timeline_getpage_histo) = &inner.per_timeline_latency_histo {
|
||||||
per_timeline_getpage_histo.observe(elapsed);
|
per_timeline_getpage_histo.observe(elapsed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Some((now, inner))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for SmgrOpTimer {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.smgr_op_end();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SmgrOpFlushInProgress {
|
||||||
|
pub(crate) async fn measure<Fut, O>(mut self, mut fut: Fut) -> O
|
||||||
|
where
|
||||||
|
Fut: std::future::Future<Output = O>,
|
||||||
|
{
|
||||||
|
let mut fut = std::pin::pin!(fut);
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
// Whenever observe_guard gets called, or dropped,
|
||||||
|
// it adds the time elapsed since its last call to metrics.
|
||||||
|
// Last call is tracked in `now`.
|
||||||
|
let mut observe_guard = scopeguard::guard(
|
||||||
|
|| {
|
||||||
|
let elapsed = now - self.base;
|
||||||
|
self.global_micros
|
||||||
|
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
|
||||||
|
self.per_timeline_micros
|
||||||
|
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
|
||||||
|
self.base = now;
|
||||||
|
},
|
||||||
|
|mut observe| {
|
||||||
|
observe();
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
|
||||||
|
Ok(v) => return v,
|
||||||
|
Err(_timeout) => {
|
||||||
|
(*observe_guard)();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1302,6 +1375,8 @@ pub(crate) struct SmgrQueryTimePerTimeline {
|
|||||||
per_timeline_getpage_latency: Histogram,
|
per_timeline_getpage_latency: Histogram,
|
||||||
global_batch_size: Histogram,
|
global_batch_size: Histogram,
|
||||||
per_timeline_batch_size: Histogram,
|
per_timeline_batch_size: Histogram,
|
||||||
|
global_flush_in_progress_micros: IntCounter,
|
||||||
|
per_timeline_flush_in_progress_micros: IntCounter,
|
||||||
}
|
}
|
||||||
|
|
||||||
static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
|
static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||||
@@ -1464,6 +1539,26 @@ fn set_page_service_config_max_batch_size(conf: &PageServicePipeliningConfig) {
|
|||||||
.set(value.try_into().unwrap());
|
.set(value.try_into().unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||||
|
register_int_counter_vec!(
|
||||||
|
"pageserver_page_service_pagestream_flush_in_progress_micros",
|
||||||
|
"Counter that sums up the microseconds that a pagestream response was being flushed into the TCP connection. \
|
||||||
|
If the flush is particularly slow, this counter will be updated periodically to make slow flushes \
|
||||||
|
easily discoverable in monitoring. \
|
||||||
|
Hence, this is NOT a completion latency historgram.",
|
||||||
|
&["tenant_id", "shard_id", "timeline_id"],
|
||||||
|
)
|
||||||
|
.expect("failed to define a metric")
|
||||||
|
});
|
||||||
|
|
||||||
|
static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||||
|
register_int_counter!(
|
||||||
|
"pageserver_page_service_pagestream_flush_in_progress_micros_global",
|
||||||
|
"Like pageserver_page_service_pagestream_flush_in_progress_seconds, but instance-wide.",
|
||||||
|
)
|
||||||
|
.expect("failed to define a metric")
|
||||||
|
});
|
||||||
|
|
||||||
impl SmgrQueryTimePerTimeline {
|
impl SmgrQueryTimePerTimeline {
|
||||||
pub(crate) fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self {
|
pub(crate) fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self {
|
||||||
let tenant_id = tenant_shard_id.tenant_id.to_string();
|
let tenant_id = tenant_shard_id.tenant_id.to_string();
|
||||||
@@ -1504,6 +1599,12 @@ impl SmgrQueryTimePerTimeline {
|
|||||||
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
|
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
let global_flush_in_progress_micros =
|
||||||
|
PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL.clone();
|
||||||
|
let per_timeline_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS
|
||||||
|
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
global_started,
|
global_started,
|
||||||
global_latency,
|
global_latency,
|
||||||
@@ -1511,6 +1612,8 @@ impl SmgrQueryTimePerTimeline {
|
|||||||
per_timeline_getpage_started,
|
per_timeline_getpage_started,
|
||||||
global_batch_size,
|
global_batch_size,
|
||||||
per_timeline_batch_size,
|
per_timeline_batch_size,
|
||||||
|
global_flush_in_progress_micros,
|
||||||
|
per_timeline_flush_in_progress_micros,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, started_at: Instant) -> SmgrOpTimer {
|
pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, started_at: Instant) -> SmgrOpTimer {
|
||||||
@@ -1523,13 +1626,17 @@ impl SmgrQueryTimePerTimeline {
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
SmgrOpTimer {
|
SmgrOpTimer(Some(SmgrOpTimerInner {
|
||||||
global_latency_histo: self.global_latency[op as usize].clone(),
|
global_latency_histo: self.global_latency[op as usize].clone(),
|
||||||
per_timeline_latency_histo,
|
per_timeline_latency_histo,
|
||||||
start: started_at,
|
start: started_at,
|
||||||
op,
|
op,
|
||||||
throttled: Duration::ZERO,
|
throttled: Duration::ZERO,
|
||||||
}
|
global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(),
|
||||||
|
per_timeline_flush_in_progress_micros: self
|
||||||
|
.per_timeline_flush_in_progress_micros
|
||||||
|
.clone(),
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn observe_getpage_batch_start(&self, batch_size: usize) {
|
pub(crate) fn observe_getpage_batch_start(&self, batch_size: usize) {
|
||||||
@@ -2204,6 +2311,15 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMet
|
|||||||
.expect("failed to define a metric"),
|
.expect("failed to define a metric"),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
pub(crate) static PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||||
|
register_int_counter_vec!(
|
||||||
|
"pageserver_timeline_wal_records_received",
|
||||||
|
"Number of WAL records received per shard",
|
||||||
|
&["tenant_id", "shard_id", "timeline_id"]
|
||||||
|
)
|
||||||
|
.expect("failed to define a metric")
|
||||||
|
});
|
||||||
|
|
||||||
pub(crate) static WAL_REDO_TIME: Lazy<Histogram> = Lazy::new(|| {
|
pub(crate) static WAL_REDO_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||||
register_histogram!(
|
register_histogram!(
|
||||||
"pageserver_wal_redo_seconds",
|
"pageserver_wal_redo_seconds",
|
||||||
@@ -2431,6 +2547,7 @@ pub(crate) struct TimelineMetrics {
|
|||||||
pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>,
|
pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>,
|
||||||
/// Number of valid LSN leases.
|
/// Number of valid LSN leases.
|
||||||
pub valid_lsn_lease_count_gauge: UIntGauge,
|
pub valid_lsn_lease_count_gauge: UIntGauge,
|
||||||
|
pub wal_records_received: IntCounter,
|
||||||
shutdown: std::sync::atomic::AtomicBool,
|
shutdown: std::sync::atomic::AtomicBool,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2588,6 +2705,10 @@ impl TimelineMetrics {
|
|||||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
let wal_records_received = PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED
|
||||||
|
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
TimelineMetrics {
|
TimelineMetrics {
|
||||||
tenant_id,
|
tenant_id,
|
||||||
shard_id,
|
shard_id,
|
||||||
@@ -2620,6 +2741,7 @@ impl TimelineMetrics {
|
|||||||
evictions_with_low_residence_duration,
|
evictions_with_low_residence_duration,
|
||||||
),
|
),
|
||||||
valid_lsn_lease_count_gauge,
|
valid_lsn_lease_count_gauge,
|
||||||
|
wal_records_received,
|
||||||
shutdown: std::sync::atomic::AtomicBool::default(),
|
shutdown: std::sync::atomic::AtomicBool::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -2757,6 +2879,16 @@ impl TimelineMetrics {
|
|||||||
shard_id,
|
shard_id,
|
||||||
timeline_id,
|
timeline_id,
|
||||||
]);
|
]);
|
||||||
|
let _ = PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED.remove_label_values(&[
|
||||||
|
tenant_id,
|
||||||
|
shard_id,
|
||||||
|
timeline_id,
|
||||||
|
]);
|
||||||
|
let _ = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS.remove_label_values(&[
|
||||||
|
tenant_id,
|
||||||
|
shard_id,
|
||||||
|
timeline_id,
|
||||||
|
]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1017,10 +1017,8 @@ impl PageServerHandler {
|
|||||||
// Map handler result to protocol behavior.
|
// Map handler result to protocol behavior.
|
||||||
// Some handler errors cause exit from pagestream protocol.
|
// Some handler errors cause exit from pagestream protocol.
|
||||||
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
|
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
|
||||||
let mut timers: smallvec::SmallVec<[_; 1]> =
|
|
||||||
smallvec::SmallVec::with_capacity(handler_results.len());
|
|
||||||
for handler_result in handler_results {
|
for handler_result in handler_results {
|
||||||
let response_msg = match handler_result {
|
let (response_msg, timer) = match handler_result {
|
||||||
Err(e) => match &e {
|
Err(e) => match &e {
|
||||||
PageStreamError::Shutdown => {
|
PageStreamError::Shutdown => {
|
||||||
// If we fail to fulfil a request during shutdown, which may be _because_ of
|
// If we fail to fulfil a request during shutdown, which may be _because_ of
|
||||||
@@ -1044,34 +1042,66 @@ impl PageServerHandler {
|
|||||||
span.in_scope(|| {
|
span.in_scope(|| {
|
||||||
error!("error reading relation or page version: {full:#}")
|
error!("error reading relation or page version: {full:#}")
|
||||||
});
|
});
|
||||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
(
|
||||||
message: e.to_string(),
|
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||||
})
|
message: e.to_string(),
|
||||||
|
}),
|
||||||
|
None, // TODO: measure errors
|
||||||
|
)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Ok((response_msg, timer)) => {
|
Ok((response_msg, timer)) => (response_msg, Some(timer)),
|
||||||
// Extending the lifetime of the timers so observations on drop
|
|
||||||
// include the flush time.
|
|
||||||
timers.push(timer);
|
|
||||||
response_msg
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
//
|
||||||
// marshal & transmit response message
|
// marshal & transmit response message
|
||||||
|
//
|
||||||
|
|
||||||
pgb_writer.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
|
pgb_writer.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
|
||||||
}
|
|
||||||
tokio::select! {
|
// We purposefully don't count flush time into the timer.
|
||||||
biased;
|
//
|
||||||
_ = cancel.cancelled() => {
|
// The reason is that current compute client will not perform protocol processing
|
||||||
// We were requested to shut down.
|
// if the postgres backend process is doing things other than `->smgr_read()`.
|
||||||
info!("shutdown request received in page handler");
|
// This is especially the case for prefetch.
|
||||||
return Err(QueryError::Shutdown)
|
//
|
||||||
}
|
// If the compute doesn't read from the connection, eventually TCP will backpressure
|
||||||
res = pgb_writer.flush() => {
|
// all the way into our flush call below.
|
||||||
res?;
|
//
|
||||||
|
// The timer's underlying metric is used for a storage-internal latency SLO and
|
||||||
|
// we don't want to include latency in it that we can't control.
|
||||||
|
// And as pointed out above, in this case, we don't control the time that flush will take.
|
||||||
|
let flushing_timer =
|
||||||
|
timer.map(|timer| timer.observe_smgr_op_completion_and_start_flushing());
|
||||||
|
|
||||||
|
// what we want to do
|
||||||
|
let flush_fut = pgb_writer.flush();
|
||||||
|
// metric for how long flushing takes
|
||||||
|
let flush_fut = match flushing_timer {
|
||||||
|
Some(flushing_timer) => {
|
||||||
|
futures::future::Either::Left(flushing_timer.measure(flush_fut))
|
||||||
|
}
|
||||||
|
None => futures::future::Either::Right(flush_fut),
|
||||||
|
};
|
||||||
|
// do it while respecting cancellation
|
||||||
|
let _: () = async move {
|
||||||
|
tokio::select! {
|
||||||
|
biased;
|
||||||
|
_ = cancel.cancelled() => {
|
||||||
|
// We were requested to shut down.
|
||||||
|
info!("shutdown request received in page handler");
|
||||||
|
return Err(QueryError::Shutdown)
|
||||||
|
}
|
||||||
|
res = flush_fut => {
|
||||||
|
res?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
// and log the info! line inside the request span
|
||||||
|
.instrument(span.clone())
|
||||||
|
.await?;
|
||||||
}
|
}
|
||||||
drop(timers);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -49,6 +49,7 @@ use timeline::import_pgdata;
|
|||||||
use timeline::offload::offload_timeline;
|
use timeline::offload::offload_timeline;
|
||||||
use timeline::CompactFlags;
|
use timeline::CompactFlags;
|
||||||
use timeline::CompactOptions;
|
use timeline::CompactOptions;
|
||||||
|
use timeline::CompactionError;
|
||||||
use timeline::ShutdownMode;
|
use timeline::ShutdownMode;
|
||||||
use tokio::io::BufReader;
|
use tokio::io::BufReader;
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
@@ -2987,10 +2988,16 @@ impl Tenant {
|
|||||||
if has_pending_l0_compaction_task {
|
if has_pending_l0_compaction_task {
|
||||||
Some(true)
|
Some(true)
|
||||||
} else {
|
} else {
|
||||||
let has_pending_scheduled_compaction_task;
|
let mut has_pending_scheduled_compaction_task;
|
||||||
let next_scheduled_compaction_task = {
|
let next_scheduled_compaction_task = {
|
||||||
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
|
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||||
if let Some(tline_pending_tasks) = guard.get_mut(timeline_id) {
|
if let Some(tline_pending_tasks) = guard.get_mut(timeline_id) {
|
||||||
|
if !tline_pending_tasks.is_empty() {
|
||||||
|
info!(
|
||||||
|
"{} tasks left in the compaction schedule queue",
|
||||||
|
tline_pending_tasks.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
let next_task = tline_pending_tasks.pop_front();
|
let next_task = tline_pending_tasks.pop_front();
|
||||||
has_pending_scheduled_compaction_task = !tline_pending_tasks.is_empty();
|
has_pending_scheduled_compaction_task = !tline_pending_tasks.is_empty();
|
||||||
next_task
|
next_task
|
||||||
@@ -3007,6 +3014,41 @@ impl Tenant {
|
|||||||
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
|
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
|
||||||
{
|
{
|
||||||
warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options);
|
warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options);
|
||||||
|
} else if next_scheduled_compaction_task.options.sub_compaction {
|
||||||
|
info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
|
||||||
|
let jobs = timeline
|
||||||
|
.gc_compaction_split_jobs(next_scheduled_compaction_task.options)
|
||||||
|
.await
|
||||||
|
.map_err(CompactionError::Other)?;
|
||||||
|
if jobs.is_empty() {
|
||||||
|
info!("no jobs to run, skipping scheduled compaction task");
|
||||||
|
} else {
|
||||||
|
has_pending_scheduled_compaction_task = true;
|
||||||
|
let jobs_len = jobs.len();
|
||||||
|
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||||
|
let tline_pending_tasks = guard.entry(*timeline_id).or_default();
|
||||||
|
for (idx, job) in jobs.into_iter().enumerate() {
|
||||||
|
tline_pending_tasks.push_back(if idx == jobs_len - 1 {
|
||||||
|
ScheduledCompactionTask {
|
||||||
|
options: job,
|
||||||
|
// The last job in the queue sends the signal and releases the gc guard
|
||||||
|
result_tx: next_scheduled_compaction_task
|
||||||
|
.result_tx
|
||||||
|
.take(),
|
||||||
|
gc_block: next_scheduled_compaction_task
|
||||||
|
.gc_block
|
||||||
|
.take(),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ScheduledCompactionTask {
|
||||||
|
options: job,
|
||||||
|
result_tx: None,
|
||||||
|
gc_block: None,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
let _ = timeline
|
let _ = timeline
|
||||||
.compact_with_options(
|
.compact_with_options(
|
||||||
@@ -3062,15 +3104,22 @@ impl Tenant {
|
|||||||
&self,
|
&self,
|
||||||
timeline_id: TimelineId,
|
timeline_id: TimelineId,
|
||||||
options: CompactOptions,
|
options: CompactOptions,
|
||||||
) -> tokio::sync::oneshot::Receiver<()> {
|
) -> anyhow::Result<tokio::sync::oneshot::Receiver<()>> {
|
||||||
|
let gc_guard = match self.gc_block.start().await {
|
||||||
|
Ok(guard) => guard,
|
||||||
|
Err(e) => {
|
||||||
|
bail!("cannot run gc-compaction because gc is blocked: {}", e);
|
||||||
|
}
|
||||||
|
};
|
||||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
|
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||||
let tline_pending_tasks = guard.entry(timeline_id).or_default();
|
let tline_pending_tasks = guard.entry(timeline_id).or_default();
|
||||||
tline_pending_tasks.push_back(ScheduledCompactionTask {
|
tline_pending_tasks.push_back(ScheduledCompactionTask {
|
||||||
options,
|
options,
|
||||||
result_tx: Some(tx),
|
result_tx: Some(tx),
|
||||||
|
gc_block: Some(gc_guard),
|
||||||
});
|
});
|
||||||
rx
|
Ok(rx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call through to all timelines to freeze ephemeral layers if needed. Usually
|
// Call through to all timelines to freeze ephemeral layers if needed. Usually
|
||||||
@@ -8117,6 +8166,12 @@ mod tests {
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
{
|
{
|
||||||
|
tline
|
||||||
|
.latest_gc_cutoff_lsn
|
||||||
|
.lock_for_write()
|
||||||
|
.store_and_unlock(Lsn(0x30))
|
||||||
|
.wait()
|
||||||
|
.await;
|
||||||
// Update GC info
|
// Update GC info
|
||||||
let mut guard = tline.gc_info.write().unwrap();
|
let mut guard = tline.gc_info.write().unwrap();
|
||||||
guard.cutoffs.time = Lsn(0x30);
|
guard.cutoffs.time = Lsn(0x30);
|
||||||
@@ -8219,6 +8274,12 @@ mod tests {
|
|||||||
|
|
||||||
// increase GC horizon and compact again
|
// increase GC horizon and compact again
|
||||||
{
|
{
|
||||||
|
tline
|
||||||
|
.latest_gc_cutoff_lsn
|
||||||
|
.lock_for_write()
|
||||||
|
.store_and_unlock(Lsn(0x40))
|
||||||
|
.wait()
|
||||||
|
.await;
|
||||||
// Update GC info
|
// Update GC info
|
||||||
let mut guard = tline.gc_info.write().unwrap();
|
let mut guard = tline.gc_info.write().unwrap();
|
||||||
guard.cutoffs.time = Lsn(0x40);
|
guard.cutoffs.time = Lsn(0x40);
|
||||||
@@ -8599,6 +8660,12 @@ mod tests {
|
|||||||
.await?
|
.await?
|
||||||
};
|
};
|
||||||
{
|
{
|
||||||
|
tline
|
||||||
|
.latest_gc_cutoff_lsn
|
||||||
|
.lock_for_write()
|
||||||
|
.store_and_unlock(Lsn(0x30))
|
||||||
|
.wait()
|
||||||
|
.await;
|
||||||
// Update GC info
|
// Update GC info
|
||||||
let mut guard = tline.gc_info.write().unwrap();
|
let mut guard = tline.gc_info.write().unwrap();
|
||||||
*guard = GcInfo {
|
*guard = GcInfo {
|
||||||
@@ -8680,6 +8747,12 @@ mod tests {
|
|||||||
|
|
||||||
// increase GC horizon and compact again
|
// increase GC horizon and compact again
|
||||||
{
|
{
|
||||||
|
tline
|
||||||
|
.latest_gc_cutoff_lsn
|
||||||
|
.lock_for_write()
|
||||||
|
.store_and_unlock(Lsn(0x40))
|
||||||
|
.wait()
|
||||||
|
.await;
|
||||||
// Update GC info
|
// Update GC info
|
||||||
let mut guard = tline.gc_info.write().unwrap();
|
let mut guard = tline.gc_info.write().unwrap();
|
||||||
guard.cutoffs.time = Lsn(0x40);
|
guard.cutoffs.time = Lsn(0x40);
|
||||||
@@ -9127,6 +9200,12 @@ mod tests {
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
{
|
{
|
||||||
|
tline
|
||||||
|
.latest_gc_cutoff_lsn
|
||||||
|
.lock_for_write()
|
||||||
|
.store_and_unlock(Lsn(0x30))
|
||||||
|
.wait()
|
||||||
|
.await;
|
||||||
// Update GC info
|
// Update GC info
|
||||||
let mut guard = tline.gc_info.write().unwrap();
|
let mut guard = tline.gc_info.write().unwrap();
|
||||||
*guard = GcInfo {
|
*guard = GcInfo {
|
||||||
@@ -9244,7 +9323,7 @@ mod tests {
|
|||||||
CompactOptions {
|
CompactOptions {
|
||||||
flags: dryrun_flags,
|
flags: dryrun_flags,
|
||||||
compact_range: None,
|
compact_range: None,
|
||||||
compact_below_lsn: None,
|
..Default::default()
|
||||||
},
|
},
|
||||||
&ctx,
|
&ctx,
|
||||||
)
|
)
|
||||||
@@ -9269,6 +9348,12 @@ mod tests {
|
|||||||
|
|
||||||
// increase GC horizon and compact again
|
// increase GC horizon and compact again
|
||||||
{
|
{
|
||||||
|
tline
|
||||||
|
.latest_gc_cutoff_lsn
|
||||||
|
.lock_for_write()
|
||||||
|
.store_and_unlock(Lsn(0x38))
|
||||||
|
.wait()
|
||||||
|
.await;
|
||||||
// Update GC info
|
// Update GC info
|
||||||
let mut guard = tline.gc_info.write().unwrap();
|
let mut guard = tline.gc_info.write().unwrap();
|
||||||
guard.cutoffs.time = Lsn(0x38);
|
guard.cutoffs.time = Lsn(0x38);
|
||||||
@@ -9364,6 +9449,12 @@ mod tests {
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
{
|
{
|
||||||
|
tline
|
||||||
|
.latest_gc_cutoff_lsn
|
||||||
|
.lock_for_write()
|
||||||
|
.store_and_unlock(Lsn(0x30))
|
||||||
|
.wait()
|
||||||
|
.await;
|
||||||
// Update GC info
|
// Update GC info
|
||||||
let mut guard = tline.gc_info.write().unwrap();
|
let mut guard = tline.gc_info.write().unwrap();
|
||||||
*guard = GcInfo {
|
*guard = GcInfo {
|
||||||
@@ -9481,7 +9572,7 @@ mod tests {
|
|||||||
CompactOptions {
|
CompactOptions {
|
||||||
flags: dryrun_flags,
|
flags: dryrun_flags,
|
||||||
compact_range: None,
|
compact_range: None,
|
||||||
compact_below_lsn: None,
|
..Default::default()
|
||||||
},
|
},
|
||||||
&ctx,
|
&ctx,
|
||||||
)
|
)
|
||||||
@@ -9608,6 +9699,12 @@ mod tests {
|
|||||||
branch_tline.add_extra_test_dense_keyspace(KeySpace::single(get_key(0)..get_key(10)));
|
branch_tline.add_extra_test_dense_keyspace(KeySpace::single(get_key(0)..get_key(10)));
|
||||||
|
|
||||||
{
|
{
|
||||||
|
parent_tline
|
||||||
|
.latest_gc_cutoff_lsn
|
||||||
|
.lock_for_write()
|
||||||
|
.store_and_unlock(Lsn(0x10))
|
||||||
|
.wait()
|
||||||
|
.await;
|
||||||
// Update GC info
|
// Update GC info
|
||||||
let mut guard = parent_tline.gc_info.write().unwrap();
|
let mut guard = parent_tline.gc_info.write().unwrap();
|
||||||
*guard = GcInfo {
|
*guard = GcInfo {
|
||||||
@@ -9622,6 +9719,12 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
|
branch_tline
|
||||||
|
.latest_gc_cutoff_lsn
|
||||||
|
.lock_for_write()
|
||||||
|
.store_and_unlock(Lsn(0x50))
|
||||||
|
.wait()
|
||||||
|
.await;
|
||||||
// Update GC info
|
// Update GC info
|
||||||
let mut guard = branch_tline.gc_info.write().unwrap();
|
let mut guard = branch_tline.gc_info.write().unwrap();
|
||||||
*guard = GcInfo {
|
*guard = GcInfo {
|
||||||
@@ -9951,6 +10054,12 @@ mod tests {
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
{
|
{
|
||||||
|
tline
|
||||||
|
.latest_gc_cutoff_lsn
|
||||||
|
.lock_for_write()
|
||||||
|
.store_and_unlock(Lsn(0x30))
|
||||||
|
.wait()
|
||||||
|
.await;
|
||||||
// Update GC info
|
// Update GC info
|
||||||
let mut guard = tline.gc_info.write().unwrap();
|
let mut guard = tline.gc_info.write().unwrap();
|
||||||
*guard = GcInfo {
|
*guard = GcInfo {
|
||||||
@@ -9973,7 +10082,7 @@ mod tests {
|
|||||||
CompactOptions {
|
CompactOptions {
|
||||||
flags: EnumSet::new(),
|
flags: EnumSet::new(),
|
||||||
compact_range: Some((get_key(0)..get_key(2)).into()),
|
compact_range: Some((get_key(0)..get_key(2)).into()),
|
||||||
compact_below_lsn: None,
|
..Default::default()
|
||||||
},
|
},
|
||||||
&ctx,
|
&ctx,
|
||||||
)
|
)
|
||||||
@@ -10020,7 +10129,7 @@ mod tests {
|
|||||||
CompactOptions {
|
CompactOptions {
|
||||||
flags: EnumSet::new(),
|
flags: EnumSet::new(),
|
||||||
compact_range: Some((get_key(2)..get_key(4)).into()),
|
compact_range: Some((get_key(2)..get_key(4)).into()),
|
||||||
compact_below_lsn: None,
|
..Default::default()
|
||||||
},
|
},
|
||||||
&ctx,
|
&ctx,
|
||||||
)
|
)
|
||||||
@@ -10072,7 +10181,7 @@ mod tests {
|
|||||||
CompactOptions {
|
CompactOptions {
|
||||||
flags: EnumSet::new(),
|
flags: EnumSet::new(),
|
||||||
compact_range: Some((get_key(4)..get_key(9)).into()),
|
compact_range: Some((get_key(4)..get_key(9)).into()),
|
||||||
compact_below_lsn: None,
|
..Default::default()
|
||||||
},
|
},
|
||||||
&ctx,
|
&ctx,
|
||||||
)
|
)
|
||||||
@@ -10123,7 +10232,7 @@ mod tests {
|
|||||||
CompactOptions {
|
CompactOptions {
|
||||||
flags: EnumSet::new(),
|
flags: EnumSet::new(),
|
||||||
compact_range: Some((get_key(9)..get_key(10)).into()),
|
compact_range: Some((get_key(9)..get_key(10)).into()),
|
||||||
compact_below_lsn: None,
|
..Default::default()
|
||||||
},
|
},
|
||||||
&ctx,
|
&ctx,
|
||||||
)
|
)
|
||||||
@@ -10179,7 +10288,7 @@ mod tests {
|
|||||||
CompactOptions {
|
CompactOptions {
|
||||||
flags: EnumSet::new(),
|
flags: EnumSet::new(),
|
||||||
compact_range: Some((get_key(0)..get_key(10)).into()),
|
compact_range: Some((get_key(0)..get_key(10)).into()),
|
||||||
compact_below_lsn: None,
|
..Default::default()
|
||||||
},
|
},
|
||||||
&ctx,
|
&ctx,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use std::collections::HashMap;
|
use std::{collections::HashMap, sync::Arc};
|
||||||
|
|
||||||
use utils::id::TimelineId;
|
use utils::id::TimelineId;
|
||||||
|
|
||||||
@@ -20,7 +20,7 @@ pub(crate) struct GcBlock {
|
|||||||
/// Do not add any more features taking and forbidding taking this lock. It should be
|
/// Do not add any more features taking and forbidding taking this lock. It should be
|
||||||
/// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`]
|
/// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`]
|
||||||
/// synchronizes with gc attempts by locking and unlocking this mutex.
|
/// synchronizes with gc attempts by locking and unlocking this mutex.
|
||||||
blocking: tokio::sync::Mutex<()>,
|
blocking: Arc<tokio::sync::Mutex<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl GcBlock {
|
impl GcBlock {
|
||||||
@@ -30,7 +30,7 @@ impl GcBlock {
|
|||||||
/// it's ending, or if not currently possible, a value describing the reasons why not.
|
/// it's ending, or if not currently possible, a value describing the reasons why not.
|
||||||
///
|
///
|
||||||
/// Cancellation safe.
|
/// Cancellation safe.
|
||||||
pub(super) async fn start(&self) -> Result<Guard<'_>, BlockingReasons> {
|
pub(super) async fn start(&self) -> Result<Guard, BlockingReasons> {
|
||||||
let reasons = {
|
let reasons = {
|
||||||
let g = self.reasons.lock().unwrap();
|
let g = self.reasons.lock().unwrap();
|
||||||
|
|
||||||
@@ -44,7 +44,7 @@ impl GcBlock {
|
|||||||
Err(reasons)
|
Err(reasons)
|
||||||
} else {
|
} else {
|
||||||
Ok(Guard {
|
Ok(Guard {
|
||||||
_inner: self.blocking.lock().await,
|
_inner: self.blocking.clone().lock_owned().await,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -170,8 +170,8 @@ impl GcBlock {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) struct Guard<'a> {
|
pub(crate) struct Guard {
|
||||||
_inner: tokio::sync::MutexGuard<'a, ()>,
|
_inner: tokio::sync::OwnedMutexGuard<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|||||||
@@ -785,6 +785,9 @@ pub(crate) struct CompactRequest {
|
|||||||
/// Whether the compaction job should be scheduled.
|
/// Whether the compaction job should be scheduled.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub scheduled: bool,
|
pub scheduled: bool,
|
||||||
|
/// Whether the compaction job should be split across key ranges.
|
||||||
|
#[serde(default)]
|
||||||
|
pub sub_compaction: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[serde_with::serde_as]
|
#[serde_with::serde_as]
|
||||||
@@ -814,6 +817,9 @@ pub(crate) struct CompactOptions {
|
|||||||
/// If set, the compaction will only compact the LSN below this value.
|
/// If set, the compaction will only compact the LSN below this value.
|
||||||
/// This option is only used by GC compaction.
|
/// This option is only used by GC compaction.
|
||||||
pub compact_below_lsn: Option<Lsn>,
|
pub compact_below_lsn: Option<Lsn>,
|
||||||
|
/// Enable sub-compaction (split compaction job across key ranges).
|
||||||
|
/// This option is only used by GC compaction.
|
||||||
|
pub sub_compaction: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for Timeline {
|
impl std::fmt::Debug for Timeline {
|
||||||
@@ -1637,6 +1643,7 @@ impl Timeline {
|
|||||||
flags,
|
flags,
|
||||||
compact_range: None,
|
compact_range: None,
|
||||||
compact_below_lsn: None,
|
compact_below_lsn: None,
|
||||||
|
sub_compaction: false,
|
||||||
},
|
},
|
||||||
ctx,
|
ctx,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -10,8 +10,8 @@ use std::sync::Arc;
|
|||||||
|
|
||||||
use super::layer_manager::LayerManager;
|
use super::layer_manager::LayerManager;
|
||||||
use super::{
|
use super::{
|
||||||
CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
|
CompactFlags, CompactOptions, CompactRange, CreateImageLayersError, DurationRecorder,
|
||||||
RecordedDuration, Timeline,
|
ImageLayerCreationMode, RecordedDuration, Timeline,
|
||||||
};
|
};
|
||||||
|
|
||||||
use anyhow::{anyhow, bail, Context};
|
use anyhow::{anyhow, bail, Context};
|
||||||
@@ -29,7 +29,6 @@ use utils::id::TimelineId;
|
|||||||
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
|
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
|
||||||
use crate::page_cache;
|
use crate::page_cache;
|
||||||
use crate::statvfs::Statvfs;
|
use crate::statvfs::Statvfs;
|
||||||
use crate::tenant::checks::check_valid_layermap;
|
|
||||||
use crate::tenant::remote_timeline_client::WaitCompletionError;
|
use crate::tenant::remote_timeline_client::WaitCompletionError;
|
||||||
use crate::tenant::storage_layer::batch_split_writer::{
|
use crate::tenant::storage_layer::batch_split_writer::{
|
||||||
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
|
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
|
||||||
@@ -42,7 +41,7 @@ use crate::tenant::storage_layer::{
|
|||||||
use crate::tenant::timeline::ImageLayerCreationOutcome;
|
use crate::tenant::timeline::ImageLayerCreationOutcome;
|
||||||
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
|
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
|
||||||
use crate::tenant::timeline::{Layer, ResidentLayer};
|
use crate::tenant::timeline::{Layer, ResidentLayer};
|
||||||
use crate::tenant::{DeltaLayer, MaybeOffloaded};
|
use crate::tenant::{gc_block, DeltaLayer, MaybeOffloaded};
|
||||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||||
use pageserver_api::config::tenant_conf_defaults::{
|
use pageserver_api::config::tenant_conf_defaults::{
|
||||||
DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
|
DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
|
||||||
@@ -64,9 +63,12 @@ use super::CompactionError;
|
|||||||
const COMPACTION_DELTA_THRESHOLD: usize = 5;
|
const COMPACTION_DELTA_THRESHOLD: usize = 5;
|
||||||
|
|
||||||
/// A scheduled compaction task.
|
/// A scheduled compaction task.
|
||||||
pub struct ScheduledCompactionTask {
|
pub(crate) struct ScheduledCompactionTask {
|
||||||
pub options: CompactOptions,
|
pub options: CompactOptions,
|
||||||
|
/// The channel to send the compaction result. If this is a subcompaction, the last compaction job holds the sender.
|
||||||
pub result_tx: Option<tokio::sync::oneshot::Sender<()>>,
|
pub result_tx: Option<tokio::sync::oneshot::Sender<()>>,
|
||||||
|
/// Hold the GC block. If this is a subcompaction, the last compaction job holds the gc block guard.
|
||||||
|
pub gc_block: Option<gc_block::Guard>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct GcCompactionJobDescription {
|
pub struct GcCompactionJobDescription {
|
||||||
@@ -1752,6 +1754,115 @@ impl Timeline {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Split a gc-compaction job into multiple compaction jobs. Optimally, this function should return a vector of
|
||||||
|
/// `GcCompactionJobDesc`. But we want to keep it simple on the tenant scheduling side without exposing too much
|
||||||
|
/// ad-hoc information about gc compaction itself.
|
||||||
|
pub(crate) async fn gc_compaction_split_jobs(
|
||||||
|
self: &Arc<Self>,
|
||||||
|
options: CompactOptions,
|
||||||
|
) -> anyhow::Result<Vec<CompactOptions>> {
|
||||||
|
if !options.sub_compaction {
|
||||||
|
return Ok(vec![options]);
|
||||||
|
}
|
||||||
|
let compact_range = options.compact_range.clone().unwrap_or(CompactRange {
|
||||||
|
start: Key::MIN,
|
||||||
|
end: Key::MAX,
|
||||||
|
});
|
||||||
|
let compact_below_lsn = if let Some(compact_below_lsn) = options.compact_below_lsn {
|
||||||
|
compact_below_lsn
|
||||||
|
} else {
|
||||||
|
*self.get_latest_gc_cutoff_lsn() // use the real gc cutoff
|
||||||
|
};
|
||||||
|
let mut compact_jobs = Vec::new();
|
||||||
|
// For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
|
||||||
|
// by estimating the amount of files read for a compaction job. We should also partition on LSN.
|
||||||
|
let Ok(partition) = self.partitioning.try_lock() else {
|
||||||
|
bail!("failed to acquire partition lock");
|
||||||
|
};
|
||||||
|
let ((dense_ks, sparse_ks), _) = &*partition;
|
||||||
|
// Truncate the key range to be within user specified compaction range.
|
||||||
|
fn truncate_to(
|
||||||
|
source_start: &Key,
|
||||||
|
source_end: &Key,
|
||||||
|
target_start: &Key,
|
||||||
|
target_end: &Key,
|
||||||
|
) -> Option<(Key, Key)> {
|
||||||
|
let start = source_start.max(target_start);
|
||||||
|
let end = source_end.min(target_end);
|
||||||
|
if start < end {
|
||||||
|
Some((*start, *end))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let mut split_key_ranges = Vec::new();
|
||||||
|
let ranges = dense_ks
|
||||||
|
.parts
|
||||||
|
.iter()
|
||||||
|
.map(|partition| partition.ranges.iter())
|
||||||
|
.chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
|
||||||
|
.flatten()
|
||||||
|
.cloned()
|
||||||
|
.collect_vec();
|
||||||
|
for range in ranges.iter() {
|
||||||
|
let Some((start, end)) = truncate_to(
|
||||||
|
&range.start,
|
||||||
|
&range.end,
|
||||||
|
&compact_range.start,
|
||||||
|
&compact_range.end,
|
||||||
|
) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
split_key_ranges.push((start, end));
|
||||||
|
}
|
||||||
|
split_key_ranges.sort();
|
||||||
|
let guard = self.layers.read().await;
|
||||||
|
let layer_map = guard.layer_map()?;
|
||||||
|
let mut current_start = None;
|
||||||
|
// Split compaction job to about 2GB each
|
||||||
|
const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024; // 4GB, TODO: should be configuration in the future
|
||||||
|
let ranges_num = split_key_ranges.len();
|
||||||
|
for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
|
||||||
|
if current_start.is_none() {
|
||||||
|
current_start = Some(start);
|
||||||
|
}
|
||||||
|
let start = current_start.unwrap();
|
||||||
|
if start >= end {
|
||||||
|
// We have already processed this partition.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let res = layer_map.range_search(start..end, compact_below_lsn);
|
||||||
|
let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::<u64>();
|
||||||
|
if total_size > GC_COMPACT_MAX_SIZE_MB * 1024 * 1024 || ranges_num == idx + 1 {
|
||||||
|
let mut compact_options = options.clone();
|
||||||
|
// Try to extend the compaction range so that we include at least one full layer file.
|
||||||
|
let extended_end = res
|
||||||
|
.found
|
||||||
|
.keys()
|
||||||
|
.map(|layer| layer.layer.key_range.end)
|
||||||
|
.min();
|
||||||
|
// It is possible that the search range does not contain any layer files when we reach the end of the loop.
|
||||||
|
// In this case, we simply use the specified key range end.
|
||||||
|
let end = if let Some(extended_end) = extended_end {
|
||||||
|
extended_end.max(end)
|
||||||
|
} else {
|
||||||
|
end
|
||||||
|
};
|
||||||
|
info!(
|
||||||
|
"splitting compaction job: {}..{}, estimated_size={}",
|
||||||
|
start, end, total_size
|
||||||
|
);
|
||||||
|
compact_options.compact_range = Some(CompactRange { start, end });
|
||||||
|
compact_options.compact_below_lsn = Some(compact_below_lsn);
|
||||||
|
compact_options.sub_compaction = false;
|
||||||
|
compact_jobs.push(compact_options);
|
||||||
|
current_start = Some(end);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
drop(guard);
|
||||||
|
Ok(compact_jobs)
|
||||||
|
}
|
||||||
|
|
||||||
/// An experimental compaction building block that combines compaction with garbage collection.
|
/// An experimental compaction building block that combines compaction with garbage collection.
|
||||||
///
|
///
|
||||||
/// The current implementation picks all delta + image layers that are below or intersecting with
|
/// The current implementation picks all delta + image layers that are below or intersecting with
|
||||||
@@ -1774,6 +1885,36 @@ impl Timeline {
|
|||||||
options: CompactOptions,
|
options: CompactOptions,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
|
if options.sub_compaction {
|
||||||
|
info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
|
||||||
|
let jobs = self.gc_compaction_split_jobs(options).await?;
|
||||||
|
let jobs_len = jobs.len();
|
||||||
|
for (idx, job) in jobs.into_iter().enumerate() {
|
||||||
|
info!(
|
||||||
|
"running enhanced gc bottom-most compaction, sub-compaction {}/{}",
|
||||||
|
idx + 1,
|
||||||
|
jobs_len
|
||||||
|
);
|
||||||
|
self.compact_with_gc_inner(cancel, job, ctx).await?;
|
||||||
|
}
|
||||||
|
if jobs_len == 0 {
|
||||||
|
info!("no jobs to run, skipping gc bottom-most compaction");
|
||||||
|
}
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
self.compact_with_gc_inner(cancel, options, ctx).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn compact_with_gc_inner(
|
||||||
|
self: &Arc<Self>,
|
||||||
|
cancel: &CancellationToken,
|
||||||
|
options: CompactOptions,
|
||||||
|
ctx: &RequestContext,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
assert!(
|
||||||
|
!options.sub_compaction,
|
||||||
|
"sub-compaction should be handled by the outer function"
|
||||||
|
);
|
||||||
// Block other compaction/GC tasks from running for now. GC-compaction could run along
|
// Block other compaction/GC tasks from running for now. GC-compaction could run along
|
||||||
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
|
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
|
||||||
// Note that we already acquired the compaction lock when the outer `compact` function gets called.
|
// Note that we already acquired the compaction lock when the outer `compact` function gets called.
|
||||||
@@ -1823,7 +1964,11 @@ impl Timeline {
|
|||||||
let gc_info = self.gc_info.read().unwrap();
|
let gc_info = self.gc_info.read().unwrap();
|
||||||
let mut retain_lsns_below_horizon = Vec::new();
|
let mut retain_lsns_below_horizon = Vec::new();
|
||||||
let gc_cutoff = {
|
let gc_cutoff = {
|
||||||
let real_gc_cutoff = gc_info.cutoffs.select_min();
|
// Currently, gc-compaction only kicks in after the legacy gc has updated the gc_cutoff.
|
||||||
|
// Therefore, it can only clean up data that cannot be cleaned up with legacy gc, instead of
|
||||||
|
// cleaning everything that theoritically it could. In the future, it should use `self.gc_info`
|
||||||
|
// to get the truth data.
|
||||||
|
let real_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
|
||||||
// The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
|
// The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
|
||||||
// each of the retain_lsn. Therefore, if the user-provided `compact_below_lsn` is larger than the real gc cutoff, we will use
|
// each of the retain_lsn. Therefore, if the user-provided `compact_below_lsn` is larger than the real gc cutoff, we will use
|
||||||
// the real cutoff.
|
// the real cutoff.
|
||||||
@@ -1943,14 +2088,15 @@ impl Timeline {
|
|||||||
|
|
||||||
// Step 1: construct a k-merge iterator over all layers.
|
// Step 1: construct a k-merge iterator over all layers.
|
||||||
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
|
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
|
||||||
let layer_names = job_desc
|
// disable the check for now because we need to adjust the check for partial compactions, will enable later.
|
||||||
.selected_layers
|
// let layer_names = job_desc
|
||||||
.iter()
|
// .selected_layers
|
||||||
.map(|layer| layer.layer_desc().layer_name())
|
// .iter()
|
||||||
.collect_vec();
|
// .map(|layer| layer.layer_desc().layer_name())
|
||||||
if let Some(err) = check_valid_layermap(&layer_names) {
|
// .collect_vec();
|
||||||
warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
|
// if let Some(err) = check_valid_layermap(&layer_names) {
|
||||||
}
|
// warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
|
||||||
|
// }
|
||||||
// The maximum LSN we are processing in this compaction loop
|
// The maximum LSN we are processing in this compaction loop
|
||||||
let end_lsn = job_desc
|
let end_lsn = job_desc
|
||||||
.selected_layers
|
.selected_layers
|
||||||
|
|||||||
@@ -369,6 +369,13 @@ pub(super) async fn handle_walreceiver_connection(
|
|||||||
// advances it to its end LSN. 0 is just an initialization placeholder.
|
// advances it to its end LSN. 0 is just an initialization placeholder.
|
||||||
let mut modification = timeline.begin_modification(Lsn(0));
|
let mut modification = timeline.begin_modification(Lsn(0));
|
||||||
|
|
||||||
|
if !records.is_empty() {
|
||||||
|
timeline
|
||||||
|
.metrics
|
||||||
|
.wal_records_received
|
||||||
|
.inc_by(records.len() as u64);
|
||||||
|
}
|
||||||
|
|
||||||
for interpreted in records {
|
for interpreted in records {
|
||||||
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
|
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
|
||||||
&& uncommitted_records > 0
|
&& uncommitted_records > 0
|
||||||
@@ -510,6 +517,7 @@ pub(super) async fn handle_walreceiver_connection(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Ingest the records without immediately committing them.
|
// Ingest the records without immediately committing them.
|
||||||
|
timeline.metrics.wal_records_received.inc();
|
||||||
let ingested = walingest
|
let ingested = walingest
|
||||||
.ingest_record(interpreted, &mut modification, &ctx)
|
.ingest_record(interpreted, &mut modification, &ctx)
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -610,6 +610,9 @@ prefetch_read(PrefetchRequest *slot)
|
|||||||
{
|
{
|
||||||
NeonResponse *response;
|
NeonResponse *response;
|
||||||
MemoryContext old;
|
MemoryContext old;
|
||||||
|
BufferTag buftag;
|
||||||
|
shardno_t shard_no;
|
||||||
|
uint64 my_ring_index;
|
||||||
|
|
||||||
Assert(slot->status == PRFS_REQUESTED);
|
Assert(slot->status == PRFS_REQUESTED);
|
||||||
Assert(slot->response == NULL);
|
Assert(slot->response == NULL);
|
||||||
@@ -623,11 +626,29 @@ prefetch_read(PrefetchRequest *slot)
|
|||||||
slot->status, slot->response,
|
slot->status, slot->response,
|
||||||
(long)slot->my_ring_index, (long)MyPState->ring_receive);
|
(long)slot->my_ring_index, (long)MyPState->ring_receive);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Copy the request info so that if an error happens and the prefetch
|
||||||
|
* queue is flushed during the receive call, we can print the original
|
||||||
|
* values in the error message
|
||||||
|
*/
|
||||||
|
buftag = slot->buftag;
|
||||||
|
shard_no = slot->shard_no;
|
||||||
|
my_ring_index = slot->my_ring_index;
|
||||||
|
|
||||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||||
response = (NeonResponse *) page_server->receive(slot->shard_no);
|
response = (NeonResponse *) page_server->receive(shard_no);
|
||||||
MemoryContextSwitchTo(old);
|
MemoryContextSwitchTo(old);
|
||||||
if (response)
|
if (response)
|
||||||
{
|
{
|
||||||
|
/* The slot should still be valid */
|
||||||
|
if (slot->status != PRFS_REQUESTED ||
|
||||||
|
slot->response != NULL ||
|
||||||
|
slot->my_ring_index != MyPState->ring_receive)
|
||||||
|
neon_shard_log(shard_no, ERROR,
|
||||||
|
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
|
||||||
|
slot->status, slot->response,
|
||||||
|
(long) slot->my_ring_index, (long) MyPState->ring_receive);
|
||||||
|
|
||||||
/* update prefetch state */
|
/* update prefetch state */
|
||||||
MyPState->n_responses_buffered += 1;
|
MyPState->n_responses_buffered += 1;
|
||||||
MyPState->n_requests_inflight -= 1;
|
MyPState->n_requests_inflight -= 1;
|
||||||
@@ -642,11 +663,15 @@ prefetch_read(PrefetchRequest *slot)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
neon_shard_log(slot->shard_no, LOG,
|
/*
|
||||||
|
* Note: The slot might no longer be valid, if the connection was lost
|
||||||
|
* and the prefetch queue was flushed during the receive call
|
||||||
|
*/
|
||||||
|
neon_shard_log(shard_no, LOG,
|
||||||
"No response from reading prefetch entry %lu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
|
"No response from reading prefetch entry %lu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
|
||||||
(long)slot->my_ring_index,
|
(long) my_ring_index,
|
||||||
RelFileInfoFmt(BufTagGetNRelFileInfo(slot->buftag)),
|
RelFileInfoFmt(BufTagGetNRelFileInfo(buftag)),
|
||||||
slot->buftag.forkNum, slot->buftag.blockNum);
|
buftag.forkNum, buftag.blockNum);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -83,14 +83,20 @@ impl Env {
|
|||||||
node_id: NodeId,
|
node_id: NodeId,
|
||||||
ttid: TenantTimelineId,
|
ttid: TenantTimelineId,
|
||||||
) -> anyhow::Result<Arc<Timeline>> {
|
) -> anyhow::Result<Arc<Timeline>> {
|
||||||
let conf = self.make_conf(node_id);
|
let conf = Arc::new(self.make_conf(node_id));
|
||||||
let timeline_dir = get_timeline_dir(&conf, &ttid);
|
let timeline_dir = get_timeline_dir(&conf, &ttid);
|
||||||
let remote_path = remote_timeline_path(&ttid)?;
|
let remote_path = remote_timeline_path(&ttid)?;
|
||||||
|
|
||||||
let safekeeper = self.make_safekeeper(node_id, ttid).await?;
|
let safekeeper = self.make_safekeeper(node_id, ttid).await?;
|
||||||
let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
|
let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
|
||||||
|
|
||||||
let timeline = Timeline::new(ttid, &timeline_dir, &remote_path, shared_state);
|
let timeline = Timeline::new(
|
||||||
|
ttid,
|
||||||
|
&timeline_dir,
|
||||||
|
&remote_path,
|
||||||
|
shared_state,
|
||||||
|
conf.clone(),
|
||||||
|
);
|
||||||
timeline.bootstrap(
|
timeline.bootstrap(
|
||||||
&mut timeline.write_shared_state().await,
|
&mut timeline.write_shared_state().await,
|
||||||
&conf,
|
&conf,
|
||||||
|
|||||||
@@ -338,7 +338,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let conf = SafeKeeperConf {
|
let conf = Arc::new(SafeKeeperConf {
|
||||||
workdir,
|
workdir,
|
||||||
my_id: id,
|
my_id: id,
|
||||||
listen_pg_addr: args.listen_pg,
|
listen_pg_addr: args.listen_pg,
|
||||||
@@ -368,7 +368,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
control_file_save_interval: args.control_file_save_interval,
|
control_file_save_interval: args.control_file_save_interval,
|
||||||
partial_backup_concurrency: args.partial_backup_concurrency,
|
partial_backup_concurrency: args.partial_backup_concurrency,
|
||||||
eviction_min_resident: args.eviction_min_resident,
|
eviction_min_resident: args.eviction_min_resident,
|
||||||
};
|
});
|
||||||
|
|
||||||
// initialize sentry if SENTRY_DSN is provided
|
// initialize sentry if SENTRY_DSN is provided
|
||||||
let _sentry_guard = init_sentry(
|
let _sentry_guard = init_sentry(
|
||||||
@@ -382,7 +382,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
/// complete, e.g. panicked, inner is error produced by task itself.
|
/// complete, e.g. panicked, inner is error produced by task itself.
|
||||||
type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
|
type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
|
||||||
|
|
||||||
async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
|
||||||
// fsync the datadir to make sure we have a consistent state on disk.
|
// fsync the datadir to make sure we have a consistent state on disk.
|
||||||
if !conf.no_sync {
|
if !conf.no_sync {
|
||||||
let dfd = File::open(&conf.workdir).context("open datadir for syncfs")?;
|
let dfd = File::open(&conf.workdir).context("open datadir for syncfs")?;
|
||||||
@@ -428,9 +428,11 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
|||||||
e
|
e
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
let global_timelines = Arc::new(GlobalTimelines::new(conf.clone()));
|
||||||
|
|
||||||
// Register metrics collector for active timelines. It's important to do this
|
// Register metrics collector for active timelines. It's important to do this
|
||||||
// after daemonizing, otherwise process collector will be upset.
|
// after daemonizing, otherwise process collector will be upset.
|
||||||
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
|
let timeline_collector = safekeeper::metrics::TimelineCollector::new(global_timelines.clone());
|
||||||
metrics::register_internal(Box::new(timeline_collector))?;
|
metrics::register_internal(Box::new(timeline_collector))?;
|
||||||
|
|
||||||
wal_backup::init_remote_storage(&conf).await;
|
wal_backup::init_remote_storage(&conf).await;
|
||||||
@@ -447,9 +449,8 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
|||||||
.then(|| Handle::try_current().expect("no runtime in main"));
|
.then(|| Handle::try_current().expect("no runtime in main"));
|
||||||
|
|
||||||
// Load all timelines from disk to memory.
|
// Load all timelines from disk to memory.
|
||||||
GlobalTimelines::init(conf.clone()).await?;
|
global_timelines.init().await?;
|
||||||
|
|
||||||
let conf_ = conf.clone();
|
|
||||||
// Run everything in current thread rt, if asked.
|
// Run everything in current thread rt, if asked.
|
||||||
if conf.current_thread_runtime {
|
if conf.current_thread_runtime {
|
||||||
info!("running in current thread runtime");
|
info!("running in current thread runtime");
|
||||||
@@ -459,14 +460,16 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
|||||||
.as_ref()
|
.as_ref()
|
||||||
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
|
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
|
||||||
.spawn(wal_service::task_main(
|
.spawn(wal_service::task_main(
|
||||||
conf_,
|
conf.clone(),
|
||||||
pg_listener,
|
pg_listener,
|
||||||
Scope::SafekeeperData,
|
Scope::SafekeeperData,
|
||||||
|
global_timelines.clone(),
|
||||||
))
|
))
|
||||||
// wrap with task name for error reporting
|
// wrap with task name for error reporting
|
||||||
.map(|res| ("WAL service main".to_owned(), res));
|
.map(|res| ("WAL service main".to_owned(), res));
|
||||||
tasks_handles.push(Box::pin(wal_service_handle));
|
tasks_handles.push(Box::pin(wal_service_handle));
|
||||||
|
|
||||||
|
let global_timelines_ = global_timelines.clone();
|
||||||
let timeline_housekeeping_handle = current_thread_rt
|
let timeline_housekeeping_handle = current_thread_rt
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
|
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
|
||||||
@@ -474,40 +477,45 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
|||||||
const TOMBSTONE_TTL: Duration = Duration::from_secs(3600 * 24);
|
const TOMBSTONE_TTL: Duration = Duration::from_secs(3600 * 24);
|
||||||
loop {
|
loop {
|
||||||
tokio::time::sleep(TOMBSTONE_TTL).await;
|
tokio::time::sleep(TOMBSTONE_TTL).await;
|
||||||
GlobalTimelines::housekeeping(&TOMBSTONE_TTL);
|
global_timelines_.housekeeping(&TOMBSTONE_TTL);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.map(|res| ("Timeline map housekeeping".to_owned(), res));
|
.map(|res| ("Timeline map housekeeping".to_owned(), res));
|
||||||
tasks_handles.push(Box::pin(timeline_housekeeping_handle));
|
tasks_handles.push(Box::pin(timeline_housekeeping_handle));
|
||||||
|
|
||||||
if let Some(pg_listener_tenant_only) = pg_listener_tenant_only {
|
if let Some(pg_listener_tenant_only) = pg_listener_tenant_only {
|
||||||
let conf_ = conf.clone();
|
|
||||||
let wal_service_handle = current_thread_rt
|
let wal_service_handle = current_thread_rt
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
|
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
|
||||||
.spawn(wal_service::task_main(
|
.spawn(wal_service::task_main(
|
||||||
conf_,
|
conf.clone(),
|
||||||
pg_listener_tenant_only,
|
pg_listener_tenant_only,
|
||||||
Scope::Tenant,
|
Scope::Tenant,
|
||||||
|
global_timelines.clone(),
|
||||||
))
|
))
|
||||||
// wrap with task name for error reporting
|
// wrap with task name for error reporting
|
||||||
.map(|res| ("WAL service tenant only main".to_owned(), res));
|
.map(|res| ("WAL service tenant only main".to_owned(), res));
|
||||||
tasks_handles.push(Box::pin(wal_service_handle));
|
tasks_handles.push(Box::pin(wal_service_handle));
|
||||||
}
|
}
|
||||||
|
|
||||||
let conf_ = conf.clone();
|
|
||||||
let http_handle = current_thread_rt
|
let http_handle = current_thread_rt
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.unwrap_or_else(|| HTTP_RUNTIME.handle())
|
.unwrap_or_else(|| HTTP_RUNTIME.handle())
|
||||||
.spawn(http::task_main(conf_, http_listener))
|
.spawn(http::task_main(
|
||||||
|
conf.clone(),
|
||||||
|
http_listener,
|
||||||
|
global_timelines.clone(),
|
||||||
|
))
|
||||||
.map(|res| ("HTTP service main".to_owned(), res));
|
.map(|res| ("HTTP service main".to_owned(), res));
|
||||||
tasks_handles.push(Box::pin(http_handle));
|
tasks_handles.push(Box::pin(http_handle));
|
||||||
|
|
||||||
let conf_ = conf.clone();
|
|
||||||
let broker_task_handle = current_thread_rt
|
let broker_task_handle = current_thread_rt
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.unwrap_or_else(|| BROKER_RUNTIME.handle())
|
.unwrap_or_else(|| BROKER_RUNTIME.handle())
|
||||||
.spawn(broker::task_main(conf_).instrument(info_span!("broker")))
|
.spawn(
|
||||||
|
broker::task_main(conf.clone(), global_timelines.clone())
|
||||||
|
.instrument(info_span!("broker")),
|
||||||
|
)
|
||||||
.map(|res| ("broker main".to_owned(), res));
|
.map(|res| ("broker main".to_owned(), res));
|
||||||
tasks_handles.push(Box::pin(broker_task_handle));
|
tasks_handles.push(Box::pin(broker_task_handle));
|
||||||
|
|
||||||
|
|||||||
@@ -39,14 +39,17 @@ const RETRY_INTERVAL_MSEC: u64 = 1000;
|
|||||||
const PUSH_INTERVAL_MSEC: u64 = 1000;
|
const PUSH_INTERVAL_MSEC: u64 = 1000;
|
||||||
|
|
||||||
/// Push once in a while data about all active timelines to the broker.
|
/// Push once in a while data about all active timelines to the broker.
|
||||||
async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
async fn push_loop(
|
||||||
|
conf: Arc<SafeKeeperConf>,
|
||||||
|
global_timelines: Arc<GlobalTimelines>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
if conf.disable_periodic_broker_push {
|
if conf.disable_periodic_broker_push {
|
||||||
info!("broker push_loop is disabled, doing nothing...");
|
info!("broker push_loop is disabled, doing nothing...");
|
||||||
futures::future::pending::<()>().await; // sleep forever
|
futures::future::pending::<()>().await; // sleep forever
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let active_timelines_set = GlobalTimelines::get_global_broker_active_set();
|
let active_timelines_set = global_timelines.get_global_broker_active_set();
|
||||||
|
|
||||||
let mut client =
|
let mut client =
|
||||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
||||||
@@ -87,8 +90,13 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
|||||||
|
|
||||||
/// Subscribe and fetch all the interesting data from the broker.
|
/// Subscribe and fetch all the interesting data from the broker.
|
||||||
#[instrument(name = "broker_pull", skip_all)]
|
#[instrument(name = "broker_pull", skip_all)]
|
||||||
async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
|
async fn pull_loop(
|
||||||
let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;
|
conf: Arc<SafeKeeperConf>,
|
||||||
|
global_timelines: Arc<GlobalTimelines>,
|
||||||
|
stats: Arc<BrokerStats>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let mut client =
|
||||||
|
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
||||||
|
|
||||||
// TODO: subscribe only to local timelines instead of all
|
// TODO: subscribe only to local timelines instead of all
|
||||||
let request = SubscribeSafekeeperInfoRequest {
|
let request = SubscribeSafekeeperInfoRequest {
|
||||||
@@ -113,7 +121,7 @@ async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()>
|
|||||||
.as_ref()
|
.as_ref()
|
||||||
.ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
|
.ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
|
||||||
let ttid = parse_proto_ttid(proto_ttid)?;
|
let ttid = parse_proto_ttid(proto_ttid)?;
|
||||||
if let Ok(tli) = GlobalTimelines::get(ttid) {
|
if let Ok(tli) = global_timelines.get(ttid) {
|
||||||
// Note that we also receive *our own* info. That's
|
// Note that we also receive *our own* info. That's
|
||||||
// important, as it is used as an indication of live
|
// important, as it is used as an indication of live
|
||||||
// connection to the broker.
|
// connection to the broker.
|
||||||
@@ -135,7 +143,11 @@ async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()>
|
|||||||
|
|
||||||
/// Process incoming discover requests. This is done in a separate task to avoid
|
/// Process incoming discover requests. This is done in a separate task to avoid
|
||||||
/// interfering with the normal pull/push loops.
|
/// interfering with the normal pull/push loops.
|
||||||
async fn discover_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
|
async fn discover_loop(
|
||||||
|
conf: Arc<SafeKeeperConf>,
|
||||||
|
global_timelines: Arc<GlobalTimelines>,
|
||||||
|
stats: Arc<BrokerStats>,
|
||||||
|
) -> Result<()> {
|
||||||
let mut client =
|
let mut client =
|
||||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
||||||
|
|
||||||
@@ -171,7 +183,7 @@ async fn discover_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<
|
|||||||
.as_ref()
|
.as_ref()
|
||||||
.ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
|
.ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
|
||||||
let ttid = parse_proto_ttid(proto_ttid)?;
|
let ttid = parse_proto_ttid(proto_ttid)?;
|
||||||
if let Ok(tli) = GlobalTimelines::get(ttid) {
|
if let Ok(tli) = global_timelines.get(ttid) {
|
||||||
// we received a discovery request for a timeline we know about
|
// we received a discovery request for a timeline we know about
|
||||||
discover_counter.inc();
|
discover_counter.inc();
|
||||||
|
|
||||||
@@ -210,7 +222,10 @@ async fn discover_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<
|
|||||||
bail!("end of stream");
|
bail!("end of stream");
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
pub async fn task_main(
|
||||||
|
conf: Arc<SafeKeeperConf>,
|
||||||
|
global_timelines: Arc<GlobalTimelines>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
info!("started, broker endpoint {:?}", conf.broker_endpoint);
|
info!("started, broker endpoint {:?}", conf.broker_endpoint);
|
||||||
|
|
||||||
let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
|
let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
|
||||||
@@ -261,13 +276,13 @@ pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
|||||||
},
|
},
|
||||||
_ = ticker.tick() => {
|
_ = ticker.tick() => {
|
||||||
if push_handle.is_none() {
|
if push_handle.is_none() {
|
||||||
push_handle = Some(tokio::spawn(push_loop(conf.clone())));
|
push_handle = Some(tokio::spawn(push_loop(conf.clone(), global_timelines.clone())));
|
||||||
}
|
}
|
||||||
if pull_handle.is_none() {
|
if pull_handle.is_none() {
|
||||||
pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), stats.clone())));
|
pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), global_timelines.clone(), stats.clone())));
|
||||||
}
|
}
|
||||||
if discover_handle.is_none() {
|
if discover_handle.is_none() {
|
||||||
discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), stats.clone())));
|
discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), global_timelines.clone(), stats.clone())));
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
_ = &mut stats_task => {}
|
_ = &mut stats_task => {}
|
||||||
|
|||||||
@@ -1,9 +1,7 @@
|
|||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use anyhow::{bail, Result};
|
use anyhow::{bail, Result};
|
||||||
use camino::Utf8PathBuf;
|
use camino::Utf8PathBuf;
|
||||||
|
|
||||||
use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
|
use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
|
||||||
|
use std::sync::Arc;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
fs::OpenOptions,
|
fs::OpenOptions,
|
||||||
io::{AsyncSeekExt, AsyncWriteExt},
|
io::{AsyncSeekExt, AsyncWriteExt},
|
||||||
@@ -14,7 +12,7 @@ use utils::{id::TenantTimelineId, lsn::Lsn};
|
|||||||
use crate::{
|
use crate::{
|
||||||
control_file::FileStorage,
|
control_file::FileStorage,
|
||||||
state::TimelinePersistentState,
|
state::TimelinePersistentState,
|
||||||
timeline::{Timeline, TimelineError, WalResidentTimeline},
|
timeline::{TimelineError, WalResidentTimeline},
|
||||||
timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},
|
timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},
|
||||||
wal_backup::copy_s3_segments,
|
wal_backup::copy_s3_segments,
|
||||||
wal_storage::{wal_file_paths, WalReader},
|
wal_storage::{wal_file_paths, WalReader},
|
||||||
@@ -25,16 +23,19 @@ use crate::{
|
|||||||
const MAX_BACKUP_LAG: u64 = 10 * WAL_SEGMENT_SIZE as u64;
|
const MAX_BACKUP_LAG: u64 = 10 * WAL_SEGMENT_SIZE as u64;
|
||||||
|
|
||||||
pub struct Request {
|
pub struct Request {
|
||||||
pub source: Arc<Timeline>,
|
pub source_ttid: TenantTimelineId,
|
||||||
pub until_lsn: Lsn,
|
pub until_lsn: Lsn,
|
||||||
pub destination_ttid: TenantTimelineId,
|
pub destination_ttid: TenantTimelineId,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_request(request: Request) -> Result<()> {
|
pub async fn handle_request(
|
||||||
|
request: Request,
|
||||||
|
global_timelines: Arc<GlobalTimelines>,
|
||||||
|
) -> Result<()> {
|
||||||
// TODO: request.until_lsn MUST be a valid LSN, and we cannot check it :(
|
// TODO: request.until_lsn MUST be a valid LSN, and we cannot check it :(
|
||||||
// if LSN will point to the middle of a WAL record, timeline will be in "broken" state
|
// if LSN will point to the middle of a WAL record, timeline will be in "broken" state
|
||||||
|
|
||||||
match GlobalTimelines::get(request.destination_ttid) {
|
match global_timelines.get(request.destination_ttid) {
|
||||||
// timeline already exists. would be good to check that this timeline is the copy
|
// timeline already exists. would be good to check that this timeline is the copy
|
||||||
// of the source timeline, but it isn't obvious how to do that
|
// of the source timeline, but it isn't obvious how to do that
|
||||||
Ok(_) => return Ok(()),
|
Ok(_) => return Ok(()),
|
||||||
@@ -46,9 +47,10 @@ pub async fn handle_request(request: Request) -> Result<()> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let source_tli = request.source.wal_residence_guard().await?;
|
let source = global_timelines.get(request.source_ttid)?;
|
||||||
|
let source_tli = source.wal_residence_guard().await?;
|
||||||
|
|
||||||
let conf = &GlobalTimelines::get_global_config();
|
let conf = &global_timelines.get_global_config();
|
||||||
let ttid = request.destination_ttid;
|
let ttid = request.destination_ttid;
|
||||||
|
|
||||||
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
|
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
|
||||||
@@ -127,7 +129,7 @@ pub async fn handle_request(request: Request) -> Result<()> {
|
|||||||
|
|
||||||
copy_s3_segments(
|
copy_s3_segments(
|
||||||
wal_seg_size,
|
wal_seg_size,
|
||||||
&request.source.ttid,
|
&request.source_ttid,
|
||||||
&request.destination_ttid,
|
&request.destination_ttid,
|
||||||
first_segment,
|
first_segment,
|
||||||
first_ondisk_segment,
|
first_ondisk_segment,
|
||||||
@@ -158,7 +160,9 @@ pub async fn handle_request(request: Request) -> Result<()> {
|
|||||||
|
|
||||||
// now we have a ready timeline in a temp directory
|
// now we have a ready timeline in a temp directory
|
||||||
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
|
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
|
||||||
GlobalTimelines::load_temp_timeline(request.destination_ttid, &tli_dir_path, true).await?;
|
global_timelines
|
||||||
|
.load_temp_timeline(request.destination_ttid, &tli_dir_path, true)
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -207,23 +207,23 @@ pub struct FileInfo {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Build debug dump response, using the provided [`Args`] filters.
|
/// Build debug dump response, using the provided [`Args`] filters.
|
||||||
pub async fn build(args: Args) -> Result<Response> {
|
pub async fn build(args: Args, global_timelines: Arc<GlobalTimelines>) -> Result<Response> {
|
||||||
let start_time = Utc::now();
|
let start_time = Utc::now();
|
||||||
let timelines_count = GlobalTimelines::timelines_count();
|
let timelines_count = global_timelines.timelines_count();
|
||||||
let config = GlobalTimelines::get_global_config();
|
let config = global_timelines.get_global_config();
|
||||||
|
|
||||||
let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() {
|
let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() {
|
||||||
// If both tenant_id and timeline_id are specified, we can just get the
|
// If both tenant_id and timeline_id are specified, we can just get the
|
||||||
// timeline directly, without taking a snapshot of the whole list.
|
// timeline directly, without taking a snapshot of the whole list.
|
||||||
let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap());
|
let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap());
|
||||||
if let Ok(tli) = GlobalTimelines::get(ttid) {
|
if let Ok(tli) = global_timelines.get(ttid) {
|
||||||
vec![tli]
|
vec![tli]
|
||||||
} else {
|
} else {
|
||||||
vec![]
|
vec![]
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Otherwise, take a snapshot of the whole list.
|
// Otherwise, take a snapshot of the whole list.
|
||||||
GlobalTimelines::get_all()
|
global_timelines.get_all()
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut timelines = Vec::new();
|
let mut timelines = Vec::new();
|
||||||
@@ -344,12 +344,12 @@ fn get_wal_last_modified(path: &Utf8Path) -> Result<Option<DateTime<Utc>>> {
|
|||||||
|
|
||||||
/// Converts SafeKeeperConf to Config, filtering out the fields that are not
|
/// Converts SafeKeeperConf to Config, filtering out the fields that are not
|
||||||
/// supposed to be exposed.
|
/// supposed to be exposed.
|
||||||
fn build_config(config: SafeKeeperConf) -> Config {
|
fn build_config(config: Arc<SafeKeeperConf>) -> Config {
|
||||||
Config {
|
Config {
|
||||||
id: config.my_id,
|
id: config.my_id,
|
||||||
workdir: config.workdir.into(),
|
workdir: config.workdir.clone().into(),
|
||||||
listen_pg_addr: config.listen_pg_addr,
|
listen_pg_addr: config.listen_pg_addr.clone(),
|
||||||
listen_http_addr: config.listen_http_addr,
|
listen_http_addr: config.listen_http_addr.clone(),
|
||||||
no_sync: config.no_sync,
|
no_sync: config.no_sync,
|
||||||
max_offloader_lag_bytes: config.max_offloader_lag_bytes,
|
max_offloader_lag_bytes: config.max_offloader_lag_bytes,
|
||||||
wal_backup_enabled: config.wal_backup_enabled,
|
wal_backup_enabled: config.wal_backup_enabled,
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ use utils::{
|
|||||||
|
|
||||||
/// Safekeeper handler of postgres commands
|
/// Safekeeper handler of postgres commands
|
||||||
pub struct SafekeeperPostgresHandler {
|
pub struct SafekeeperPostgresHandler {
|
||||||
pub conf: SafeKeeperConf,
|
pub conf: Arc<SafeKeeperConf>,
|
||||||
/// assigned application name
|
/// assigned application name
|
||||||
pub appname: Option<String>,
|
pub appname: Option<String>,
|
||||||
pub tenant_id: Option<TenantId>,
|
pub tenant_id: Option<TenantId>,
|
||||||
@@ -43,6 +43,7 @@ pub struct SafekeeperPostgresHandler {
|
|||||||
pub protocol: Option<PostgresClientProtocol>,
|
pub protocol: Option<PostgresClientProtocol>,
|
||||||
/// Unique connection id is logged in spans for observability.
|
/// Unique connection id is logged in spans for observability.
|
||||||
pub conn_id: ConnectionId,
|
pub conn_id: ConnectionId,
|
||||||
|
pub global_timelines: Arc<GlobalTimelines>,
|
||||||
/// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured.
|
/// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured.
|
||||||
auth: Option<(Scope, Arc<JwtAuth>)>,
|
auth: Option<(Scope, Arc<JwtAuth>)>,
|
||||||
claims: Option<Claims>,
|
claims: Option<Claims>,
|
||||||
@@ -314,10 +315,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
|||||||
|
|
||||||
impl SafekeeperPostgresHandler {
|
impl SafekeeperPostgresHandler {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
conf: SafeKeeperConf,
|
conf: Arc<SafeKeeperConf>,
|
||||||
conn_id: u32,
|
conn_id: u32,
|
||||||
io_metrics: Option<TrafficMetrics>,
|
io_metrics: Option<TrafficMetrics>,
|
||||||
auth: Option<(Scope, Arc<JwtAuth>)>,
|
auth: Option<(Scope, Arc<JwtAuth>)>,
|
||||||
|
global_timelines: Arc<GlobalTimelines>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
SafekeeperPostgresHandler {
|
SafekeeperPostgresHandler {
|
||||||
conf,
|
conf,
|
||||||
@@ -331,6 +333,7 @@ impl SafekeeperPostgresHandler {
|
|||||||
claims: None,
|
claims: None,
|
||||||
auth,
|
auth,
|
||||||
io_metrics,
|
io_metrics,
|
||||||
|
global_timelines,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -360,7 +363,7 @@ impl SafekeeperPostgresHandler {
|
|||||||
pgb: &mut PostgresBackend<IO>,
|
pgb: &mut PostgresBackend<IO>,
|
||||||
) -> Result<(), QueryError> {
|
) -> Result<(), QueryError> {
|
||||||
// Get timeline, handling "not found" error
|
// Get timeline, handling "not found" error
|
||||||
let tli = match GlobalTimelines::get(self.ttid) {
|
let tli = match self.global_timelines.get(self.ttid) {
|
||||||
Ok(tli) => Ok(Some(tli)),
|
Ok(tli) => Ok(Some(tli)),
|
||||||
Err(TimelineError::NotFound(_)) => Ok(None),
|
Err(TimelineError::NotFound(_)) => Ok(None),
|
||||||
Err(e) => Err(QueryError::Other(e.into())),
|
Err(e) => Err(QueryError::Other(e.into())),
|
||||||
@@ -394,7 +397,10 @@ impl SafekeeperPostgresHandler {
|
|||||||
&mut self,
|
&mut self,
|
||||||
pgb: &mut PostgresBackend<IO>,
|
pgb: &mut PostgresBackend<IO>,
|
||||||
) -> Result<(), QueryError> {
|
) -> Result<(), QueryError> {
|
||||||
let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?;
|
let tli = self
|
||||||
|
.global_timelines
|
||||||
|
.get(self.ttid)
|
||||||
|
.map_err(|e| QueryError::Other(e.into()))?;
|
||||||
|
|
||||||
let lsn = if self.is_walproposer_recovery() {
|
let lsn = if self.is_walproposer_recovery() {
|
||||||
// walproposer should get all local WAL until flush_lsn
|
// walproposer should get all local WAL until flush_lsn
|
||||||
|
|||||||
@@ -3,14 +3,16 @@ pub mod routes;
|
|||||||
pub use routes::make_router;
|
pub use routes::make_router;
|
||||||
|
|
||||||
pub use safekeeper_api::models;
|
pub use safekeeper_api::models;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use crate::SafeKeeperConf;
|
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||||
|
|
||||||
pub async fn task_main(
|
pub async fn task_main(
|
||||||
conf: SafeKeeperConf,
|
conf: Arc<SafeKeeperConf>,
|
||||||
http_listener: std::net::TcpListener,
|
http_listener: std::net::TcpListener,
|
||||||
|
global_timelines: Arc<GlobalTimelines>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let router = make_router(conf)
|
let router = make_router(conf, global_timelines)
|
||||||
.build()
|
.build()
|
||||||
.map_err(|err| anyhow::anyhow!(err))?;
|
.map_err(|err| anyhow::anyhow!(err))?;
|
||||||
let service = utils::http::RouterService::new(router).unwrap();
|
let service = utils::http::RouterService::new(router).unwrap();
|
||||||
|
|||||||
@@ -66,6 +66,13 @@ fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
|
|||||||
.as_ref()
|
.as_ref()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_global_timelines(request: &Request<Body>) -> Arc<GlobalTimelines> {
|
||||||
|
request
|
||||||
|
.data::<Arc<GlobalTimelines>>()
|
||||||
|
.expect("unknown state type")
|
||||||
|
.clone()
|
||||||
|
}
|
||||||
|
|
||||||
/// Same as TermLsn, but serializes LSN using display serializer
|
/// Same as TermLsn, but serializes LSN using display serializer
|
||||||
/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
|
/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
|
||||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||||
@@ -123,9 +130,11 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
|
|||||||
let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
|
let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
|
||||||
check_permission(&request, Some(tenant_id))?;
|
check_permission(&request, Some(tenant_id))?;
|
||||||
ensure_no_body(&mut request).await?;
|
ensure_no_body(&mut request).await?;
|
||||||
|
let global_timelines = get_global_timelines(&request);
|
||||||
// FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
|
// FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
|
||||||
// Using an `InternalServerError` should be fixed when the types support it
|
// Using an `InternalServerError` should be fixed when the types support it
|
||||||
let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id, only_local)
|
let delete_info = global_timelines
|
||||||
|
.delete_force_all_for_tenant(&tenant_id, only_local)
|
||||||
.await
|
.await
|
||||||
.map_err(ApiError::InternalServerError)?;
|
.map_err(ApiError::InternalServerError)?;
|
||||||
json_response(
|
json_response(
|
||||||
@@ -156,7 +165,9 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
|||||||
.commit_lsn
|
.commit_lsn
|
||||||
.segment_lsn(server_info.wal_seg_size as usize)
|
.segment_lsn(server_info.wal_seg_size as usize)
|
||||||
});
|
});
|
||||||
GlobalTimelines::create(ttid, server_info, request_data.commit_lsn, local_start_lsn)
|
let global_timelines = get_global_timelines(&request);
|
||||||
|
global_timelines
|
||||||
|
.create(ttid, server_info, request_data.commit_lsn, local_start_lsn)
|
||||||
.await
|
.await
|
||||||
.map_err(ApiError::InternalServerError)?;
|
.map_err(ApiError::InternalServerError)?;
|
||||||
|
|
||||||
@@ -167,7 +178,9 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
|||||||
/// Note: it is possible to do the same with debug_dump.
|
/// Note: it is possible to do the same with debug_dump.
|
||||||
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||||
check_permission(&request, None)?;
|
check_permission(&request, None)?;
|
||||||
let res: Vec<TenantTimelineId> = GlobalTimelines::get_all()
|
let global_timelines = get_global_timelines(&request);
|
||||||
|
let res: Vec<TenantTimelineId> = global_timelines
|
||||||
|
.get_all()
|
||||||
.iter()
|
.iter()
|
||||||
.map(|tli| tli.ttid)
|
.map(|tli| tli.ttid)
|
||||||
.collect();
|
.collect();
|
||||||
@@ -182,7 +195,8 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
|||||||
);
|
);
|
||||||
check_permission(&request, Some(ttid.tenant_id))?;
|
check_permission(&request, Some(ttid.tenant_id))?;
|
||||||
|
|
||||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
let global_timelines = get_global_timelines(&request);
|
||||||
|
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
|
||||||
let (inmem, state) = tli.get_state().await;
|
let (inmem, state) = tli.get_state().await;
|
||||||
let flush_lsn = tli.get_flush_lsn().await;
|
let flush_lsn = tli.get_flush_lsn().await;
|
||||||
|
|
||||||
@@ -233,9 +247,11 @@ async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<
|
|||||||
let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
|
let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
|
||||||
check_permission(&request, Some(ttid.tenant_id))?;
|
check_permission(&request, Some(ttid.tenant_id))?;
|
||||||
ensure_no_body(&mut request).await?;
|
ensure_no_body(&mut request).await?;
|
||||||
|
let global_timelines = get_global_timelines(&request);
|
||||||
// FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
|
// FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
|
||||||
// error handling here when we're able to.
|
// error handling here when we're able to.
|
||||||
let resp = GlobalTimelines::delete(&ttid, only_local)
|
let resp = global_timelines
|
||||||
|
.delete(&ttid, only_local)
|
||||||
.await
|
.await
|
||||||
.map_err(ApiError::InternalServerError)?;
|
.map_err(ApiError::InternalServerError)?;
|
||||||
json_response(StatusCode::OK, resp)
|
json_response(StatusCode::OK, resp)
|
||||||
@@ -247,8 +263,9 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
|
|||||||
|
|
||||||
let data: pull_timeline::Request = json_request(&mut request).await?;
|
let data: pull_timeline::Request = json_request(&mut request).await?;
|
||||||
let conf = get_conf(&request);
|
let conf = get_conf(&request);
|
||||||
|
let global_timelines = get_global_timelines(&request);
|
||||||
|
|
||||||
let resp = pull_timeline::handle_request(data, conf.sk_auth_token.clone())
|
let resp = pull_timeline::handle_request(data, conf.sk_auth_token.clone(), global_timelines)
|
||||||
.await
|
.await
|
||||||
.map_err(ApiError::InternalServerError)?;
|
.map_err(ApiError::InternalServerError)?;
|
||||||
json_response(StatusCode::OK, resp)
|
json_response(StatusCode::OK, resp)
|
||||||
@@ -263,7 +280,8 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
|
|||||||
);
|
);
|
||||||
check_permission(&request, Some(ttid.tenant_id))?;
|
check_permission(&request, Some(ttid.tenant_id))?;
|
||||||
|
|
||||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
let global_timelines = get_global_timelines(&request);
|
||||||
|
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
|
||||||
|
|
||||||
// To stream the body use wrap_stream which wants Stream of Result<Bytes>,
|
// To stream the body use wrap_stream which wants Stream of Result<Bytes>,
|
||||||
// so create the chan and write to it in another task.
|
// so create the chan and write to it in another task.
|
||||||
@@ -293,19 +311,19 @@ async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Bo
|
|||||||
check_permission(&request, None)?;
|
check_permission(&request, None)?;
|
||||||
|
|
||||||
let request_data: TimelineCopyRequest = json_request(&mut request).await?;
|
let request_data: TimelineCopyRequest = json_request(&mut request).await?;
|
||||||
let ttid = TenantTimelineId::new(
|
let source_ttid = TenantTimelineId::new(
|
||||||
parse_request_param(&request, "tenant_id")?,
|
parse_request_param(&request, "tenant_id")?,
|
||||||
parse_request_param(&request, "source_timeline_id")?,
|
parse_request_param(&request, "source_timeline_id")?,
|
||||||
);
|
);
|
||||||
|
|
||||||
let source = GlobalTimelines::get(ttid)?;
|
let global_timelines = get_global_timelines(&request);
|
||||||
|
|
||||||
copy_timeline::handle_request(copy_timeline::Request{
|
copy_timeline::handle_request(copy_timeline::Request{
|
||||||
source,
|
source_ttid,
|
||||||
until_lsn: request_data.until_lsn,
|
until_lsn: request_data.until_lsn,
|
||||||
destination_ttid: TenantTimelineId::new(ttid.tenant_id, request_data.target_timeline_id),
|
destination_ttid: TenantTimelineId::new(source_ttid.tenant_id, request_data.target_timeline_id),
|
||||||
})
|
}, global_timelines)
|
||||||
.instrument(info_span!("copy_timeline", from=%ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn))
|
.instrument(info_span!("copy_timeline", from=%source_ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn))
|
||||||
.await
|
.await
|
||||||
.map_err(ApiError::InternalServerError)?;
|
.map_err(ApiError::InternalServerError)?;
|
||||||
|
|
||||||
@@ -322,7 +340,8 @@ async fn patch_control_file_handler(
|
|||||||
parse_request_param(&request, "timeline_id")?,
|
parse_request_param(&request, "timeline_id")?,
|
||||||
);
|
);
|
||||||
|
|
||||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
let global_timelines = get_global_timelines(&request);
|
||||||
|
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
|
||||||
|
|
||||||
let patch_request: patch_control_file::Request = json_request(&mut request).await?;
|
let patch_request: patch_control_file::Request = json_request(&mut request).await?;
|
||||||
let response = patch_control_file::handle_request(tli, patch_request)
|
let response = patch_control_file::handle_request(tli, patch_request)
|
||||||
@@ -341,7 +360,8 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
|
|||||||
parse_request_param(&request, "timeline_id")?,
|
parse_request_param(&request, "timeline_id")?,
|
||||||
);
|
);
|
||||||
|
|
||||||
let tli = GlobalTimelines::get(ttid)?;
|
let global_timelines = get_global_timelines(&request);
|
||||||
|
let tli = global_timelines.get(ttid)?;
|
||||||
tli.write_shared_state()
|
tli.write_shared_state()
|
||||||
.await
|
.await
|
||||||
.sk
|
.sk
|
||||||
@@ -359,6 +379,7 @@ async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body
|
|||||||
);
|
);
|
||||||
check_permission(&request, Some(ttid.tenant_id))?;
|
check_permission(&request, Some(ttid.tenant_id))?;
|
||||||
|
|
||||||
|
let global_timelines = get_global_timelines(&request);
|
||||||
let from_lsn: Option<Lsn> = parse_query_param(&request, "from_lsn")?;
|
let from_lsn: Option<Lsn> = parse_query_param(&request, "from_lsn")?;
|
||||||
let until_lsn: Option<Lsn> = parse_query_param(&request, "until_lsn")?;
|
let until_lsn: Option<Lsn> = parse_query_param(&request, "until_lsn")?;
|
||||||
|
|
||||||
@@ -371,7 +392,7 @@ async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body
|
|||||||
)))?,
|
)))?,
|
||||||
};
|
};
|
||||||
|
|
||||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
|
||||||
let tli = tli
|
let tli = tli
|
||||||
.wal_residence_guard()
|
.wal_residence_guard()
|
||||||
.await
|
.await
|
||||||
@@ -393,7 +414,8 @@ async fn timeline_backup_partial_reset(request: Request<Body>) -> Result<Respons
|
|||||||
);
|
);
|
||||||
check_permission(&request, Some(ttid.tenant_id))?;
|
check_permission(&request, Some(ttid.tenant_id))?;
|
||||||
|
|
||||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
let global_timelines = get_global_timelines(&request);
|
||||||
|
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
|
||||||
|
|
||||||
let response = tli
|
let response = tli
|
||||||
.backup_partial_reset()
|
.backup_partial_reset()
|
||||||
@@ -415,7 +437,8 @@ async fn timeline_term_bump_handler(
|
|||||||
|
|
||||||
let request_data: TimelineTermBumpRequest = json_request(&mut request).await?;
|
let request_data: TimelineTermBumpRequest = json_request(&mut request).await?;
|
||||||
|
|
||||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
let global_timelines = get_global_timelines(&request);
|
||||||
|
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
|
||||||
let response = tli
|
let response = tli
|
||||||
.term_bump(request_data.term)
|
.term_bump(request_data.term)
|
||||||
.await
|
.await
|
||||||
@@ -452,7 +475,8 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
|
|||||||
standby_horizon: sk_info.standby_horizon.0,
|
standby_horizon: sk_info.standby_horizon.0,
|
||||||
};
|
};
|
||||||
|
|
||||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
let global_timelines = get_global_timelines(&request);
|
||||||
|
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
|
||||||
tli.record_safekeeper_info(proto_sk_info)
|
tli.record_safekeeper_info(proto_sk_info)
|
||||||
.await
|
.await
|
||||||
.map_err(ApiError::InternalServerError)?;
|
.map_err(ApiError::InternalServerError)?;
|
||||||
@@ -506,6 +530,8 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
|
|||||||
let dump_term_history = dump_term_history.unwrap_or(true);
|
let dump_term_history = dump_term_history.unwrap_or(true);
|
||||||
let dump_wal_last_modified = dump_wal_last_modified.unwrap_or(dump_all);
|
let dump_wal_last_modified = dump_wal_last_modified.unwrap_or(dump_all);
|
||||||
|
|
||||||
|
let global_timelines = get_global_timelines(&request);
|
||||||
|
|
||||||
let args = debug_dump::Args {
|
let args = debug_dump::Args {
|
||||||
dump_all,
|
dump_all,
|
||||||
dump_control_file,
|
dump_control_file,
|
||||||
@@ -517,7 +543,7 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
|
|||||||
timeline_id,
|
timeline_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
let resp = debug_dump::build(args)
|
let resp = debug_dump::build(args, global_timelines)
|
||||||
.await
|
.await
|
||||||
.map_err(ApiError::InternalServerError)?;
|
.map_err(ApiError::InternalServerError)?;
|
||||||
|
|
||||||
@@ -570,7 +596,10 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Safekeeper http router.
|
/// Safekeeper http router.
|
||||||
pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
|
pub fn make_router(
|
||||||
|
conf: Arc<SafeKeeperConf>,
|
||||||
|
global_timelines: Arc<GlobalTimelines>,
|
||||||
|
) -> RouterBuilder<hyper::Body, ApiError> {
|
||||||
let mut router = endpoint::make_router();
|
let mut router = endpoint::make_router();
|
||||||
if conf.http_auth.is_some() {
|
if conf.http_auth.is_some() {
|
||||||
router = router.middleware(auth_middleware(|request| {
|
router = router.middleware(auth_middleware(|request| {
|
||||||
@@ -592,7 +621,8 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
|||||||
// located nearby (/safekeeper/src/http/openapi_spec.yaml).
|
// located nearby (/safekeeper/src/http/openapi_spec.yaml).
|
||||||
let auth = conf.http_auth.clone();
|
let auth = conf.http_auth.clone();
|
||||||
router
|
router
|
||||||
.data(Arc::new(conf))
|
.data(conf)
|
||||||
|
.data(global_timelines)
|
||||||
.data(auth)
|
.data(auth)
|
||||||
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
|
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
|
||||||
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
|
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
|
||||||
|
|||||||
@@ -11,7 +11,6 @@ use postgres_backend::QueryError;
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
use utils::id::TenantTimelineId;
|
|
||||||
|
|
||||||
use crate::handler::SafekeeperPostgresHandler;
|
use crate::handler::SafekeeperPostgresHandler;
|
||||||
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
|
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
|
||||||
@@ -21,7 +20,6 @@ use crate::safekeeper::{
|
|||||||
use crate::safekeeper::{Term, TermHistory, TermLsn};
|
use crate::safekeeper::{Term, TermHistory, TermLsn};
|
||||||
use crate::state::TimelinePersistentState;
|
use crate::state::TimelinePersistentState;
|
||||||
use crate::timeline::WalResidentTimeline;
|
use crate::timeline::WalResidentTimeline;
|
||||||
use crate::GlobalTimelines;
|
|
||||||
use postgres_backend::PostgresBackend;
|
use postgres_backend::PostgresBackend;
|
||||||
use postgres_ffi::encode_logical_message;
|
use postgres_ffi::encode_logical_message;
|
||||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||||
@@ -70,7 +68,7 @@ pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
|
|||||||
info!("JSON_CTRL request: {append_request:?}");
|
info!("JSON_CTRL request: {append_request:?}");
|
||||||
|
|
||||||
// need to init safekeeper state before AppendRequest
|
// need to init safekeeper state before AppendRequest
|
||||||
let tli = prepare_safekeeper(spg.ttid, append_request.pg_version).await?;
|
let tli = prepare_safekeeper(spg, append_request.pg_version).await?;
|
||||||
|
|
||||||
// if send_proposer_elected is true, we need to update local history
|
// if send_proposer_elected is true, we need to update local history
|
||||||
if append_request.send_proposer_elected {
|
if append_request.send_proposer_elected {
|
||||||
@@ -99,20 +97,22 @@ pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
|
|||||||
/// Prepare safekeeper to process append requests without crashes,
|
/// Prepare safekeeper to process append requests without crashes,
|
||||||
/// by sending ProposerGreeting with default server.wal_seg_size.
|
/// by sending ProposerGreeting with default server.wal_seg_size.
|
||||||
async fn prepare_safekeeper(
|
async fn prepare_safekeeper(
|
||||||
ttid: TenantTimelineId,
|
spg: &SafekeeperPostgresHandler,
|
||||||
pg_version: u32,
|
pg_version: u32,
|
||||||
) -> anyhow::Result<WalResidentTimeline> {
|
) -> anyhow::Result<WalResidentTimeline> {
|
||||||
let tli = GlobalTimelines::create(
|
let tli = spg
|
||||||
ttid,
|
.global_timelines
|
||||||
ServerInfo {
|
.create(
|
||||||
pg_version,
|
spg.ttid,
|
||||||
wal_seg_size: WAL_SEGMENT_SIZE as u32,
|
ServerInfo {
|
||||||
system_id: 0,
|
pg_version,
|
||||||
},
|
wal_seg_size: WAL_SEGMENT_SIZE as u32,
|
||||||
Lsn::INVALID,
|
system_id: 0,
|
||||||
Lsn::INVALID,
|
},
|
||||||
)
|
Lsn::INVALID,
|
||||||
.await?;
|
Lsn::INVALID,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
tli.wal_residence_guard().await
|
tli.wal_residence_guard().await
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -455,6 +455,7 @@ pub struct FullTimelineInfo {
|
|||||||
|
|
||||||
/// Collects metrics for all active timelines.
|
/// Collects metrics for all active timelines.
|
||||||
pub struct TimelineCollector {
|
pub struct TimelineCollector {
|
||||||
|
global_timelines: Arc<GlobalTimelines>,
|
||||||
descs: Vec<Desc>,
|
descs: Vec<Desc>,
|
||||||
commit_lsn: GenericGaugeVec<AtomicU64>,
|
commit_lsn: GenericGaugeVec<AtomicU64>,
|
||||||
backup_lsn: GenericGaugeVec<AtomicU64>,
|
backup_lsn: GenericGaugeVec<AtomicU64>,
|
||||||
@@ -478,14 +479,8 @@ pub struct TimelineCollector {
|
|||||||
active_timelines_count: IntGauge,
|
active_timelines_count: IntGauge,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for TimelineCollector {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self::new()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TimelineCollector {
|
impl TimelineCollector {
|
||||||
pub fn new() -> TimelineCollector {
|
pub fn new(global_timelines: Arc<GlobalTimelines>) -> TimelineCollector {
|
||||||
let mut descs = Vec::new();
|
let mut descs = Vec::new();
|
||||||
|
|
||||||
let commit_lsn = GenericGaugeVec::new(
|
let commit_lsn = GenericGaugeVec::new(
|
||||||
@@ -676,6 +671,7 @@ impl TimelineCollector {
|
|||||||
descs.extend(active_timelines_count.desc().into_iter().cloned());
|
descs.extend(active_timelines_count.desc().into_iter().cloned());
|
||||||
|
|
||||||
TimelineCollector {
|
TimelineCollector {
|
||||||
|
global_timelines,
|
||||||
descs,
|
descs,
|
||||||
commit_lsn,
|
commit_lsn,
|
||||||
backup_lsn,
|
backup_lsn,
|
||||||
@@ -728,17 +724,18 @@ impl Collector for TimelineCollector {
|
|||||||
self.written_wal_seconds.reset();
|
self.written_wal_seconds.reset();
|
||||||
self.flushed_wal_seconds.reset();
|
self.flushed_wal_seconds.reset();
|
||||||
|
|
||||||
let timelines_count = GlobalTimelines::get_all().len();
|
let timelines_count = self.global_timelines.get_all().len();
|
||||||
let mut active_timelines_count = 0;
|
let mut active_timelines_count = 0;
|
||||||
|
|
||||||
// Prometheus Collector is sync, and data is stored under async lock. To
|
// Prometheus Collector is sync, and data is stored under async lock. To
|
||||||
// bridge the gap with a crutch, collect data in spawned thread with
|
// bridge the gap with a crutch, collect data in spawned thread with
|
||||||
// local tokio runtime.
|
// local tokio runtime.
|
||||||
|
let global_timelines = self.global_timelines.clone();
|
||||||
let infos = std::thread::spawn(|| {
|
let infos = std::thread::spawn(|| {
|
||||||
let rt = tokio::runtime::Builder::new_current_thread()
|
let rt = tokio::runtime::Builder::new_current_thread()
|
||||||
.build()
|
.build()
|
||||||
.expect("failed to create rt");
|
.expect("failed to create rt");
|
||||||
rt.block_on(collect_timeline_metrics())
|
rt.block_on(collect_timeline_metrics(global_timelines))
|
||||||
})
|
})
|
||||||
.join()
|
.join()
|
||||||
.expect("collect_timeline_metrics thread panicked");
|
.expect("collect_timeline_metrics thread panicked");
|
||||||
@@ -857,9 +854,9 @@ impl Collector for TimelineCollector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
|
async fn collect_timeline_metrics(global_timelines: Arc<GlobalTimelines>) -> Vec<FullTimelineInfo> {
|
||||||
let mut res = vec![];
|
let mut res = vec![];
|
||||||
let active_timelines = GlobalTimelines::get_global_broker_active_set().get_all();
|
let active_timelines = global_timelines.get_global_broker_active_set().get_all();
|
||||||
|
|
||||||
for tli in active_timelines {
|
for tli in active_timelines {
|
||||||
if let Some(info) = tli.info_for_metrics().await {
|
if let Some(info) = tli.info_for_metrics().await {
|
||||||
|
|||||||
@@ -409,8 +409,9 @@ pub struct DebugDumpResponse {
|
|||||||
pub async fn handle_request(
|
pub async fn handle_request(
|
||||||
request: Request,
|
request: Request,
|
||||||
sk_auth_token: Option<SecretString>,
|
sk_auth_token: Option<SecretString>,
|
||||||
|
global_timelines: Arc<GlobalTimelines>,
|
||||||
) -> Result<Response> {
|
) -> Result<Response> {
|
||||||
let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
|
let existing_tli = global_timelines.get(TenantTimelineId::new(
|
||||||
request.tenant_id,
|
request.tenant_id,
|
||||||
request.timeline_id,
|
request.timeline_id,
|
||||||
));
|
));
|
||||||
@@ -453,13 +454,14 @@ pub async fn handle_request(
|
|||||||
assert!(status.tenant_id == request.tenant_id);
|
assert!(status.tenant_id == request.tenant_id);
|
||||||
assert!(status.timeline_id == request.timeline_id);
|
assert!(status.timeline_id == request.timeline_id);
|
||||||
|
|
||||||
pull_timeline(status, safekeeper_host, sk_auth_token).await
|
pull_timeline(status, safekeeper_host, sk_auth_token, global_timelines).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn pull_timeline(
|
async fn pull_timeline(
|
||||||
status: TimelineStatus,
|
status: TimelineStatus,
|
||||||
host: String,
|
host: String,
|
||||||
sk_auth_token: Option<SecretString>,
|
sk_auth_token: Option<SecretString>,
|
||||||
|
global_timelines: Arc<GlobalTimelines>,
|
||||||
) -> Result<Response> {
|
) -> Result<Response> {
|
||||||
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
|
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
|
||||||
info!(
|
info!(
|
||||||
@@ -472,7 +474,7 @@ async fn pull_timeline(
|
|||||||
status.acceptor_state.epoch
|
status.acceptor_state.epoch
|
||||||
);
|
);
|
||||||
|
|
||||||
let conf = &GlobalTimelines::get_global_config();
|
let conf = &global_timelines.get_global_config();
|
||||||
|
|
||||||
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
|
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
|
||||||
|
|
||||||
@@ -531,7 +533,9 @@ async fn pull_timeline(
|
|||||||
assert!(status.commit_lsn <= status.flush_lsn);
|
assert!(status.commit_lsn <= status.flush_lsn);
|
||||||
|
|
||||||
// Finally, load the timeline.
|
// Finally, load the timeline.
|
||||||
let _tli = GlobalTimelines::load_temp_timeline(ttid, &tli_dir_path, false).await?;
|
let _tli = global_timelines
|
||||||
|
.load_temp_timeline(ttid, &tli_dir_path, false)
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(Response {
|
Ok(Response {
|
||||||
safekeeper_host: host,
|
safekeeper_host: host,
|
||||||
|
|||||||
@@ -267,6 +267,7 @@ impl SafekeeperPostgresHandler {
|
|||||||
pgb_reader: &mut pgb_reader,
|
pgb_reader: &mut pgb_reader,
|
||||||
peer_addr,
|
peer_addr,
|
||||||
acceptor_handle: &mut acceptor_handle,
|
acceptor_handle: &mut acceptor_handle,
|
||||||
|
global_timelines: self.global_timelines.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Read first message and create timeline if needed.
|
// Read first message and create timeline if needed.
|
||||||
@@ -331,6 +332,7 @@ struct NetworkReader<'a, IO> {
|
|||||||
// WalAcceptor is spawned when we learn server info from walproposer and
|
// WalAcceptor is spawned when we learn server info from walproposer and
|
||||||
// create timeline; handle is put here.
|
// create timeline; handle is put here.
|
||||||
acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
|
acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
|
||||||
|
global_timelines: Arc<GlobalTimelines>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
|
impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
|
||||||
@@ -350,10 +352,11 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
|
|||||||
system_id: greeting.system_id,
|
system_id: greeting.system_id,
|
||||||
wal_seg_size: greeting.wal_seg_size,
|
wal_seg_size: greeting.wal_seg_size,
|
||||||
};
|
};
|
||||||
let tli =
|
let tli = self
|
||||||
GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID)
|
.global_timelines
|
||||||
.await
|
.create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID)
|
||||||
.context("create timeline")?;
|
.await
|
||||||
|
.context("create timeline")?;
|
||||||
tli.wal_residence_guard().await?
|
tli.wal_residence_guard().await?
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ use crate::timeline::WalResidentTimeline;
|
|||||||
use crate::wal_reader_stream::WalReaderStreamBuilder;
|
use crate::wal_reader_stream::WalReaderStreamBuilder;
|
||||||
use crate::wal_service::ConnectionId;
|
use crate::wal_service::ConnectionId;
|
||||||
use crate::wal_storage::WalReader;
|
use crate::wal_storage::WalReader;
|
||||||
use crate::GlobalTimelines;
|
|
||||||
use anyhow::{bail, Context as AnyhowContext};
|
use anyhow::{bail, Context as AnyhowContext};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures::future::Either;
|
use futures::future::Either;
|
||||||
@@ -400,7 +399,10 @@ impl SafekeeperPostgresHandler {
|
|||||||
start_pos: Lsn,
|
start_pos: Lsn,
|
||||||
term: Option<Term>,
|
term: Option<Term>,
|
||||||
) -> Result<(), QueryError> {
|
) -> Result<(), QueryError> {
|
||||||
let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?;
|
let tli = self
|
||||||
|
.global_timelines
|
||||||
|
.get(self.ttid)
|
||||||
|
.map_err(|e| QueryError::Other(e.into()))?;
|
||||||
let residence_guard = tli.wal_residence_guard().await?;
|
let residence_guard = tli.wal_residence_guard().await?;
|
||||||
|
|
||||||
if let Err(end) = self
|
if let Err(end) = self
|
||||||
|
|||||||
@@ -44,8 +44,8 @@ use crate::wal_backup_partial::PartialRemoteSegment;
|
|||||||
|
|
||||||
use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
|
use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
|
||||||
use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
|
use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
|
||||||
|
use crate::SafeKeeperConf;
|
||||||
use crate::{debug_dump, timeline_manager, wal_storage};
|
use crate::{debug_dump, timeline_manager, wal_storage};
|
||||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
|
||||||
|
|
||||||
/// Things safekeeper should know about timeline state on peers.
|
/// Things safekeeper should know about timeline state on peers.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
@@ -467,6 +467,7 @@ pub struct Timeline {
|
|||||||
walreceivers: Arc<WalReceivers>,
|
walreceivers: Arc<WalReceivers>,
|
||||||
timeline_dir: Utf8PathBuf,
|
timeline_dir: Utf8PathBuf,
|
||||||
manager_ctl: ManagerCtl,
|
manager_ctl: ManagerCtl,
|
||||||
|
conf: Arc<SafeKeeperConf>,
|
||||||
|
|
||||||
/// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding
|
/// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding
|
||||||
/// this gate, you must respect [`Timeline::cancel`]
|
/// this gate, you must respect [`Timeline::cancel`]
|
||||||
@@ -489,6 +490,7 @@ impl Timeline {
|
|||||||
timeline_dir: &Utf8Path,
|
timeline_dir: &Utf8Path,
|
||||||
remote_path: &RemotePath,
|
remote_path: &RemotePath,
|
||||||
shared_state: SharedState,
|
shared_state: SharedState,
|
||||||
|
conf: Arc<SafeKeeperConf>,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
|
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
|
||||||
watch::channel(shared_state.sk.state().commit_lsn);
|
watch::channel(shared_state.sk.state().commit_lsn);
|
||||||
@@ -516,6 +518,7 @@ impl Timeline {
|
|||||||
gate: Default::default(),
|
gate: Default::default(),
|
||||||
cancel: CancellationToken::default(),
|
cancel: CancellationToken::default(),
|
||||||
manager_ctl: ManagerCtl::new(),
|
manager_ctl: ManagerCtl::new(),
|
||||||
|
conf,
|
||||||
broker_active: AtomicBool::new(false),
|
broker_active: AtomicBool::new(false),
|
||||||
wal_backup_active: AtomicBool::new(false),
|
wal_backup_active: AtomicBool::new(false),
|
||||||
last_removed_segno: AtomicU64::new(0),
|
last_removed_segno: AtomicU64::new(0),
|
||||||
@@ -524,11 +527,14 @@ impl Timeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Load existing timeline from disk.
|
/// Load existing timeline from disk.
|
||||||
pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
|
pub fn load_timeline(
|
||||||
|
conf: Arc<SafeKeeperConf>,
|
||||||
|
ttid: TenantTimelineId,
|
||||||
|
) -> Result<Arc<Timeline>> {
|
||||||
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
|
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
|
||||||
|
|
||||||
let shared_state = SharedState::restore(conf, &ttid)?;
|
let shared_state = SharedState::restore(conf.as_ref(), &ttid)?;
|
||||||
let timeline_dir = get_timeline_dir(conf, &ttid);
|
let timeline_dir = get_timeline_dir(conf.as_ref(), &ttid);
|
||||||
let remote_path = remote_timeline_path(&ttid)?;
|
let remote_path = remote_timeline_path(&ttid)?;
|
||||||
|
|
||||||
Ok(Timeline::new(
|
Ok(Timeline::new(
|
||||||
@@ -536,6 +542,7 @@ impl Timeline {
|
|||||||
&timeline_dir,
|
&timeline_dir,
|
||||||
&remote_path,
|
&remote_path,
|
||||||
shared_state,
|
shared_state,
|
||||||
|
conf,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -604,8 +611,7 @@ impl Timeline {
|
|||||||
// it is cancelled, so WAL storage won't be opened again.
|
// it is cancelled, so WAL storage won't be opened again.
|
||||||
shared_state.sk.close_wal_store();
|
shared_state.sk.close_wal_store();
|
||||||
|
|
||||||
let conf = GlobalTimelines::get_global_config();
|
if !only_local && self.conf.is_wal_backup_enabled() {
|
||||||
if !only_local && conf.is_wal_backup_enabled() {
|
|
||||||
// Note: we concurrently delete remote storage data from multiple
|
// Note: we concurrently delete remote storage data from multiple
|
||||||
// safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
|
// safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
|
||||||
// do some retries anyway.
|
// do some retries anyway.
|
||||||
@@ -951,7 +957,7 @@ impl WalResidentTimeline {
|
|||||||
|
|
||||||
pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> {
|
pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> {
|
||||||
let (_, persisted_state) = self.get_state().await;
|
let (_, persisted_state) = self.get_state().await;
|
||||||
let enable_remote_read = GlobalTimelines::get_global_config().is_wal_backup_enabled();
|
let enable_remote_read = self.conf.is_wal_backup_enabled();
|
||||||
|
|
||||||
WalReader::new(
|
WalReader::new(
|
||||||
&self.ttid,
|
&self.ttid,
|
||||||
@@ -1061,7 +1067,6 @@ impl ManagerTimeline {
|
|||||||
|
|
||||||
/// Try to switch state Offloaded->Present.
|
/// Try to switch state Offloaded->Present.
|
||||||
pub(crate) async fn switch_to_present(&self) -> anyhow::Result<()> {
|
pub(crate) async fn switch_to_present(&self) -> anyhow::Result<()> {
|
||||||
let conf = GlobalTimelines::get_global_config();
|
|
||||||
let mut shared = self.write_shared_state().await;
|
let mut shared = self.write_shared_state().await;
|
||||||
|
|
||||||
// trying to restore WAL storage
|
// trying to restore WAL storage
|
||||||
@@ -1069,7 +1074,7 @@ impl ManagerTimeline {
|
|||||||
&self.ttid,
|
&self.ttid,
|
||||||
&self.timeline_dir,
|
&self.timeline_dir,
|
||||||
shared.sk.state(),
|
shared.sk.state(),
|
||||||
conf.no_sync,
|
self.conf.no_sync,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
// updating control file
|
// updating control file
|
||||||
@@ -1096,7 +1101,7 @@ impl ManagerTimeline {
|
|||||||
// now we can switch shared.sk to Present, shouldn't fail
|
// now we can switch shared.sk to Present, shouldn't fail
|
||||||
let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
|
let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
|
||||||
let cfile_state = prev_sk.take_state();
|
let cfile_state = prev_sk.take_state();
|
||||||
shared.sk = StateSK::Loaded(SafeKeeper::new(cfile_state, wal_store, conf.my_id)?);
|
shared.sk = StateSK::Loaded(SafeKeeper::new(cfile_state, wal_store, self.conf.my_id)?);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,7 +13,6 @@ use crate::{control_file, wal_storage, SafeKeeperConf};
|
|||||||
use anyhow::{bail, Context, Result};
|
use anyhow::{bail, Context, Result};
|
||||||
use camino::Utf8PathBuf;
|
use camino::Utf8PathBuf;
|
||||||
use camino_tempfile::Utf8TempDir;
|
use camino_tempfile::Utf8TempDir;
|
||||||
use once_cell::sync::Lazy;
|
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
@@ -42,23 +41,16 @@ struct GlobalTimelinesState {
|
|||||||
// this map is dropped on restart.
|
// this map is dropped on restart.
|
||||||
tombstones: HashMap<TenantTimelineId, Instant>,
|
tombstones: HashMap<TenantTimelineId, Instant>,
|
||||||
|
|
||||||
conf: Option<SafeKeeperConf>,
|
conf: Arc<SafeKeeperConf>,
|
||||||
broker_active_set: Arc<TimelinesSet>,
|
broker_active_set: Arc<TimelinesSet>,
|
||||||
global_rate_limiter: RateLimiter,
|
global_rate_limiter: RateLimiter,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl GlobalTimelinesState {
|
impl GlobalTimelinesState {
|
||||||
/// Get configuration, which must be set once during init.
|
|
||||||
fn get_conf(&self) -> &SafeKeeperConf {
|
|
||||||
self.conf
|
|
||||||
.as_ref()
|
|
||||||
.expect("GlobalTimelinesState conf is not initialized")
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get dependencies for a timeline constructor.
|
/// Get dependencies for a timeline constructor.
|
||||||
fn get_dependencies(&self) -> (SafeKeeperConf, Arc<TimelinesSet>, RateLimiter) {
|
fn get_dependencies(&self) -> (Arc<SafeKeeperConf>, Arc<TimelinesSet>, RateLimiter) {
|
||||||
(
|
(
|
||||||
self.get_conf().clone(),
|
self.conf.clone(),
|
||||||
self.broker_active_set.clone(),
|
self.broker_active_set.clone(),
|
||||||
self.global_rate_limiter.clone(),
|
self.global_rate_limiter.clone(),
|
||||||
)
|
)
|
||||||
@@ -82,35 +74,39 @@ impl GlobalTimelinesState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
|
/// A struct used to manage access to the global timelines map.
|
||||||
Mutex::new(GlobalTimelinesState {
|
pub struct GlobalTimelines {
|
||||||
timelines: HashMap::new(),
|
state: Mutex<GlobalTimelinesState>,
|
||||||
tombstones: HashMap::new(),
|
}
|
||||||
conf: None,
|
|
||||||
broker_active_set: Arc::new(TimelinesSet::default()),
|
|
||||||
global_rate_limiter: RateLimiter::new(1, 1),
|
|
||||||
})
|
|
||||||
});
|
|
||||||
|
|
||||||
/// A zero-sized struct used to manage access to the global timelines map.
|
|
||||||
pub struct GlobalTimelines;
|
|
||||||
|
|
||||||
impl GlobalTimelines {
|
impl GlobalTimelines {
|
||||||
|
/// Create a new instance of the global timelines map.
|
||||||
|
pub fn new(conf: Arc<SafeKeeperConf>) -> Self {
|
||||||
|
Self {
|
||||||
|
state: Mutex::new(GlobalTimelinesState {
|
||||||
|
timelines: HashMap::new(),
|
||||||
|
tombstones: HashMap::new(),
|
||||||
|
conf,
|
||||||
|
broker_active_set: Arc::new(TimelinesSet::default()),
|
||||||
|
global_rate_limiter: RateLimiter::new(1, 1),
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Inject dependencies needed for the timeline constructors and load all timelines to memory.
|
/// Inject dependencies needed for the timeline constructors and load all timelines to memory.
|
||||||
pub async fn init(conf: SafeKeeperConf) -> Result<()> {
|
pub async fn init(&self) -> Result<()> {
|
||||||
// clippy isn't smart enough to understand that drop(state) releases the
|
// clippy isn't smart enough to understand that drop(state) releases the
|
||||||
// lock, so use explicit block
|
// lock, so use explicit block
|
||||||
let tenants_dir = {
|
let tenants_dir = {
|
||||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
let mut state = self.state.lock().unwrap();
|
||||||
state.global_rate_limiter = RateLimiter::new(
|
state.global_rate_limiter = RateLimiter::new(
|
||||||
conf.partial_backup_concurrency,
|
state.conf.partial_backup_concurrency,
|
||||||
DEFAULT_EVICTION_CONCURRENCY,
|
DEFAULT_EVICTION_CONCURRENCY,
|
||||||
);
|
);
|
||||||
state.conf = Some(conf);
|
|
||||||
|
|
||||||
// Iterate through all directories and load tenants for all directories
|
// Iterate through all directories and load tenants for all directories
|
||||||
// named as a valid tenant_id.
|
// named as a valid tenant_id.
|
||||||
state.get_conf().workdir.clone()
|
state.conf.workdir.clone()
|
||||||
};
|
};
|
||||||
let mut tenant_count = 0;
|
let mut tenant_count = 0;
|
||||||
for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
|
for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
|
||||||
@@ -122,7 +118,7 @@ impl GlobalTimelines {
|
|||||||
TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
|
TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
|
||||||
{
|
{
|
||||||
tenant_count += 1;
|
tenant_count += 1;
|
||||||
GlobalTimelines::load_tenant_timelines(tenant_id).await?;
|
self.load_tenant_timelines(tenant_id).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => error!(
|
Err(e) => error!(
|
||||||
@@ -135,7 +131,7 @@ impl GlobalTimelines {
|
|||||||
info!(
|
info!(
|
||||||
"found {} tenants directories, successfully loaded {} timelines",
|
"found {} tenants directories, successfully loaded {} timelines",
|
||||||
tenant_count,
|
tenant_count,
|
||||||
TIMELINES_STATE.lock().unwrap().timelines.len()
|
self.state.lock().unwrap().timelines.len()
|
||||||
);
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -143,13 +139,13 @@ impl GlobalTimelines {
|
|||||||
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir
|
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir
|
||||||
/// errors if any.
|
/// errors if any.
|
||||||
///
|
///
|
||||||
/// It is async, but TIMELINES_STATE lock is sync and there is no important
|
/// It is async, but self.state lock is sync and there is no important
|
||||||
/// reason to make it async (it is always held for a short while), so we
|
/// reason to make it async (it is always held for a short while), so we
|
||||||
/// just lock and unlock it for each timeline -- this function is called
|
/// just lock and unlock it for each timeline -- this function is called
|
||||||
/// during init when nothing else is running, so this is fine.
|
/// during init when nothing else is running, so this is fine.
|
||||||
async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
|
async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> {
|
||||||
let (conf, broker_active_set, partial_backup_rate_limiter) = {
|
let (conf, broker_active_set, partial_backup_rate_limiter) = {
|
||||||
let state = TIMELINES_STATE.lock().unwrap();
|
let state = self.state.lock().unwrap();
|
||||||
state.get_dependencies()
|
state.get_dependencies()
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -163,10 +159,10 @@ impl GlobalTimelines {
|
|||||||
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
|
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
|
||||||
{
|
{
|
||||||
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
|
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
|
||||||
match Timeline::load_timeline(&conf, ttid) {
|
match Timeline::load_timeline(conf.clone(), ttid) {
|
||||||
Ok(tli) => {
|
Ok(tli) => {
|
||||||
let mut shared_state = tli.write_shared_state().await;
|
let mut shared_state = tli.write_shared_state().await;
|
||||||
TIMELINES_STATE
|
self.state
|
||||||
.lock()
|
.lock()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.timelines
|
.timelines
|
||||||
@@ -200,29 +196,30 @@ impl GlobalTimelines {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Get the number of timelines in the map.
|
/// Get the number of timelines in the map.
|
||||||
pub fn timelines_count() -> usize {
|
pub fn timelines_count(&self) -> usize {
|
||||||
TIMELINES_STATE.lock().unwrap().timelines.len()
|
self.state.lock().unwrap().timelines.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the global safekeeper config.
|
/// Get the global safekeeper config.
|
||||||
pub fn get_global_config() -> SafeKeeperConf {
|
pub fn get_global_config(&self) -> Arc<SafeKeeperConf> {
|
||||||
TIMELINES_STATE.lock().unwrap().get_conf().clone()
|
self.state.lock().unwrap().conf.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_global_broker_active_set() -> Arc<TimelinesSet> {
|
pub fn get_global_broker_active_set(&self) -> Arc<TimelinesSet> {
|
||||||
TIMELINES_STATE.lock().unwrap().broker_active_set.clone()
|
self.state.lock().unwrap().broker_active_set.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new timeline with the given id. If the timeline already exists, returns
|
/// Create a new timeline with the given id. If the timeline already exists, returns
|
||||||
/// an existing timeline.
|
/// an existing timeline.
|
||||||
pub(crate) async fn create(
|
pub(crate) async fn create(
|
||||||
|
&self,
|
||||||
ttid: TenantTimelineId,
|
ttid: TenantTimelineId,
|
||||||
server_info: ServerInfo,
|
server_info: ServerInfo,
|
||||||
commit_lsn: Lsn,
|
commit_lsn: Lsn,
|
||||||
local_start_lsn: Lsn,
|
local_start_lsn: Lsn,
|
||||||
) -> Result<Arc<Timeline>> {
|
) -> Result<Arc<Timeline>> {
|
||||||
let (conf, _, _) = {
|
let (conf, _, _) = {
|
||||||
let state = TIMELINES_STATE.lock().unwrap();
|
let state = self.state.lock().unwrap();
|
||||||
if let Ok(timeline) = state.get(&ttid) {
|
if let Ok(timeline) = state.get(&ttid) {
|
||||||
// Timeline already exists, return it.
|
// Timeline already exists, return it.
|
||||||
return Ok(timeline);
|
return Ok(timeline);
|
||||||
@@ -245,7 +242,7 @@ impl GlobalTimelines {
|
|||||||
let state =
|
let state =
|
||||||
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?;
|
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?;
|
||||||
control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
|
control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
|
||||||
let timeline = GlobalTimelines::load_temp_timeline(ttid, &tmp_dir_path, true).await?;
|
let timeline = self.load_temp_timeline(ttid, &tmp_dir_path, true).await?;
|
||||||
Ok(timeline)
|
Ok(timeline)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -261,13 +258,14 @@ impl GlobalTimelines {
|
|||||||
/// 2) move the directory and load the timeline
|
/// 2) move the directory and load the timeline
|
||||||
/// 3) take lock again and insert the timeline into the global map.
|
/// 3) take lock again and insert the timeline into the global map.
|
||||||
pub async fn load_temp_timeline(
|
pub async fn load_temp_timeline(
|
||||||
|
&self,
|
||||||
ttid: TenantTimelineId,
|
ttid: TenantTimelineId,
|
||||||
tmp_path: &Utf8PathBuf,
|
tmp_path: &Utf8PathBuf,
|
||||||
check_tombstone: bool,
|
check_tombstone: bool,
|
||||||
) -> Result<Arc<Timeline>> {
|
) -> Result<Arc<Timeline>> {
|
||||||
// Check for existence and mark that we're creating it.
|
// Check for existence and mark that we're creating it.
|
||||||
let (conf, broker_active_set, partial_backup_rate_limiter) = {
|
let (conf, broker_active_set, partial_backup_rate_limiter) = {
|
||||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
let mut state = self.state.lock().unwrap();
|
||||||
match state.timelines.get(&ttid) {
|
match state.timelines.get(&ttid) {
|
||||||
Some(GlobalMapTimeline::CreationInProgress) => {
|
Some(GlobalMapTimeline::CreationInProgress) => {
|
||||||
bail!(TimelineError::CreationInProgress(ttid));
|
bail!(TimelineError::CreationInProgress(ttid));
|
||||||
@@ -295,10 +293,10 @@ impl GlobalTimelines {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Do the actual move and reflect the result in the map.
|
// Do the actual move and reflect the result in the map.
|
||||||
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, &conf).await {
|
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, conf.clone()).await {
|
||||||
Ok(timeline) => {
|
Ok(timeline) => {
|
||||||
let mut timeline_shared_state = timeline.write_shared_state().await;
|
let mut timeline_shared_state = timeline.write_shared_state().await;
|
||||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
let mut state = self.state.lock().unwrap();
|
||||||
assert!(matches!(
|
assert!(matches!(
|
||||||
state.timelines.get(&ttid),
|
state.timelines.get(&ttid),
|
||||||
Some(GlobalMapTimeline::CreationInProgress)
|
Some(GlobalMapTimeline::CreationInProgress)
|
||||||
@@ -319,7 +317,7 @@ impl GlobalTimelines {
|
|||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Init failed, remove the marker from the map
|
// Init failed, remove the marker from the map
|
||||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
let mut state = self.state.lock().unwrap();
|
||||||
assert!(matches!(
|
assert!(matches!(
|
||||||
state.timelines.get(&ttid),
|
state.timelines.get(&ttid),
|
||||||
Some(GlobalMapTimeline::CreationInProgress)
|
Some(GlobalMapTimeline::CreationInProgress)
|
||||||
@@ -334,10 +332,10 @@ impl GlobalTimelines {
|
|||||||
async fn install_temp_timeline(
|
async fn install_temp_timeline(
|
||||||
ttid: TenantTimelineId,
|
ttid: TenantTimelineId,
|
||||||
tmp_path: &Utf8PathBuf,
|
tmp_path: &Utf8PathBuf,
|
||||||
conf: &SafeKeeperConf,
|
conf: Arc<SafeKeeperConf>,
|
||||||
) -> Result<Arc<Timeline>> {
|
) -> Result<Arc<Timeline>> {
|
||||||
let tenant_path = get_tenant_dir(conf, &ttid.tenant_id);
|
let tenant_path = get_tenant_dir(conf.as_ref(), &ttid.tenant_id);
|
||||||
let timeline_path = get_timeline_dir(conf, &ttid);
|
let timeline_path = get_timeline_dir(conf.as_ref(), &ttid);
|
||||||
|
|
||||||
// We must have already checked that timeline doesn't exist in the map,
|
// We must have already checked that timeline doesn't exist in the map,
|
||||||
// but there might be existing datadir: if timeline is corrupted it is
|
// but there might be existing datadir: if timeline is corrupted it is
|
||||||
@@ -382,9 +380,9 @@ impl GlobalTimelines {
|
|||||||
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
|
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
|
||||||
/// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
|
/// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
|
||||||
/// i.e. loaded in memory and not cancelled.
|
/// i.e. loaded in memory and not cancelled.
|
||||||
pub(crate) fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
|
pub(crate) fn get(&self, ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
|
||||||
let tli_res = {
|
let tli_res = {
|
||||||
let state = TIMELINES_STATE.lock().unwrap();
|
let state = self.state.lock().unwrap();
|
||||||
state.get(&ttid)
|
state.get(&ttid)
|
||||||
};
|
};
|
||||||
match tli_res {
|
match tli_res {
|
||||||
@@ -399,8 +397,8 @@ impl GlobalTimelines {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns all timelines. This is used for background timeline processes.
|
/// Returns all timelines. This is used for background timeline processes.
|
||||||
pub fn get_all() -> Vec<Arc<Timeline>> {
|
pub fn get_all(&self) -> Vec<Arc<Timeline>> {
|
||||||
let global_lock = TIMELINES_STATE.lock().unwrap();
|
let global_lock = self.state.lock().unwrap();
|
||||||
global_lock
|
global_lock
|
||||||
.timelines
|
.timelines
|
||||||
.values()
|
.values()
|
||||||
@@ -419,8 +417,8 @@ impl GlobalTimelines {
|
|||||||
|
|
||||||
/// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
|
/// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
|
||||||
/// and that's why it can return cancelled timelines, to retry deleting them.
|
/// and that's why it can return cancelled timelines, to retry deleting them.
|
||||||
fn get_all_for_tenant(tenant_id: TenantId) -> Vec<Arc<Timeline>> {
|
fn get_all_for_tenant(&self, tenant_id: TenantId) -> Vec<Arc<Timeline>> {
|
||||||
let global_lock = TIMELINES_STATE.lock().unwrap();
|
let global_lock = self.state.lock().unwrap();
|
||||||
global_lock
|
global_lock
|
||||||
.timelines
|
.timelines
|
||||||
.values()
|
.values()
|
||||||
@@ -435,11 +433,12 @@ impl GlobalTimelines {
|
|||||||
/// Cancels timeline, then deletes the corresponding data directory.
|
/// Cancels timeline, then deletes the corresponding data directory.
|
||||||
/// If only_local, doesn't remove WAL segments in remote storage.
|
/// If only_local, doesn't remove WAL segments in remote storage.
|
||||||
pub(crate) async fn delete(
|
pub(crate) async fn delete(
|
||||||
|
&self,
|
||||||
ttid: &TenantTimelineId,
|
ttid: &TenantTimelineId,
|
||||||
only_local: bool,
|
only_local: bool,
|
||||||
) -> Result<TimelineDeleteForceResult> {
|
) -> Result<TimelineDeleteForceResult> {
|
||||||
let tli_res = {
|
let tli_res = {
|
||||||
let state = TIMELINES_STATE.lock().unwrap();
|
let state = self.state.lock().unwrap();
|
||||||
|
|
||||||
if state.tombstones.contains_key(ttid) {
|
if state.tombstones.contains_key(ttid) {
|
||||||
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
|
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
|
||||||
@@ -472,7 +471,7 @@ impl GlobalTimelines {
|
|||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
// Timeline is not memory, but it may still exist on disk in broken state.
|
// Timeline is not memory, but it may still exist on disk in broken state.
|
||||||
let dir_path = get_timeline_dir(TIMELINES_STATE.lock().unwrap().get_conf(), ttid);
|
let dir_path = get_timeline_dir(self.state.lock().unwrap().conf.as_ref(), ttid);
|
||||||
let dir_existed = delete_dir(dir_path)?;
|
let dir_existed = delete_dir(dir_path)?;
|
||||||
|
|
||||||
Ok(TimelineDeleteForceResult {
|
Ok(TimelineDeleteForceResult {
|
||||||
@@ -485,7 +484,7 @@ impl GlobalTimelines {
|
|||||||
// Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones
|
// Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones
|
||||||
// are used to prevent still-running computes from re-creating the same timeline when they send data,
|
// are used to prevent still-running computes from re-creating the same timeline when they send data,
|
||||||
// and to speed up repeated deletion calls by avoiding re-listing objects.
|
// and to speed up repeated deletion calls by avoiding re-listing objects.
|
||||||
TIMELINES_STATE.lock().unwrap().delete(*ttid);
|
self.state.lock().unwrap().delete(*ttid);
|
||||||
|
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
@@ -497,17 +496,18 @@ impl GlobalTimelines {
|
|||||||
///
|
///
|
||||||
/// If only_local, doesn't remove WAL segments in remote storage.
|
/// If only_local, doesn't remove WAL segments in remote storage.
|
||||||
pub async fn delete_force_all_for_tenant(
|
pub async fn delete_force_all_for_tenant(
|
||||||
|
&self,
|
||||||
tenant_id: &TenantId,
|
tenant_id: &TenantId,
|
||||||
only_local: bool,
|
only_local: bool,
|
||||||
) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
|
) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
|
||||||
info!("deleting all timelines for tenant {}", tenant_id);
|
info!("deleting all timelines for tenant {}", tenant_id);
|
||||||
let to_delete = Self::get_all_for_tenant(*tenant_id);
|
let to_delete = self.get_all_for_tenant(*tenant_id);
|
||||||
|
|
||||||
let mut err = None;
|
let mut err = None;
|
||||||
|
|
||||||
let mut deleted = HashMap::new();
|
let mut deleted = HashMap::new();
|
||||||
for tli in &to_delete {
|
for tli in &to_delete {
|
||||||
match Self::delete(&tli.ttid, only_local).await {
|
match self.delete(&tli.ttid, only_local).await {
|
||||||
Ok(result) => {
|
Ok(result) => {
|
||||||
deleted.insert(tli.ttid, result);
|
deleted.insert(tli.ttid, result);
|
||||||
}
|
}
|
||||||
@@ -529,15 +529,15 @@ impl GlobalTimelines {
|
|||||||
// so the directory may be not empty. In this case timelines will have bad state
|
// so the directory may be not empty. In this case timelines will have bad state
|
||||||
// and timeline background jobs can panic.
|
// and timeline background jobs can panic.
|
||||||
delete_dir(get_tenant_dir(
|
delete_dir(get_tenant_dir(
|
||||||
TIMELINES_STATE.lock().unwrap().get_conf(),
|
self.state.lock().unwrap().conf.as_ref(),
|
||||||
tenant_id,
|
tenant_id,
|
||||||
))?;
|
))?;
|
||||||
|
|
||||||
Ok(deleted)
|
Ok(deleted)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn housekeeping(tombstone_ttl: &Duration) {
|
pub fn housekeeping(&self, tombstone_ttl: &Duration) {
|
||||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
// We keep tombstones long enough to have a good chance of preventing rogue computes from re-creating deleted
|
// We keep tombstones long enough to have a good chance of preventing rogue computes from re-creating deleted
|
||||||
// timelines. If a compute kept running for longer than this TTL (or across a safekeeper restart) then they
|
// timelines. If a compute kept running for longer than this TTL (or across a safekeeper restart) then they
|
||||||
|
|||||||
@@ -4,6 +4,7 @@
|
|||||||
//!
|
//!
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use postgres_backend::QueryError;
|
use postgres_backend::QueryError;
|
||||||
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
use tokio_io_timeout::TimeoutReader;
|
use tokio_io_timeout::TimeoutReader;
|
||||||
@@ -11,9 +12,9 @@ use tokio_util::sync::CancellationToken;
|
|||||||
use tracing::*;
|
use tracing::*;
|
||||||
use utils::{auth::Scope, measured_stream::MeasuredStream};
|
use utils::{auth::Scope, measured_stream::MeasuredStream};
|
||||||
|
|
||||||
use crate::handler::SafekeeperPostgresHandler;
|
|
||||||
use crate::metrics::TrafficMetrics;
|
use crate::metrics::TrafficMetrics;
|
||||||
use crate::SafeKeeperConf;
|
use crate::SafeKeeperConf;
|
||||||
|
use crate::{handler::SafekeeperPostgresHandler, GlobalTimelines};
|
||||||
use postgres_backend::{AuthType, PostgresBackend};
|
use postgres_backend::{AuthType, PostgresBackend};
|
||||||
|
|
||||||
/// Accept incoming TCP connections and spawn them into a background thread.
|
/// Accept incoming TCP connections and spawn them into a background thread.
|
||||||
@@ -22,9 +23,10 @@ use postgres_backend::{AuthType, PostgresBackend};
|
|||||||
/// to any tenant are allowed) or Tenant (only tokens giving access to specific
|
/// to any tenant are allowed) or Tenant (only tokens giving access to specific
|
||||||
/// tenant are allowed). Doesn't matter if auth is disabled in conf.
|
/// tenant are allowed). Doesn't matter if auth is disabled in conf.
|
||||||
pub async fn task_main(
|
pub async fn task_main(
|
||||||
conf: SafeKeeperConf,
|
conf: Arc<SafeKeeperConf>,
|
||||||
pg_listener: std::net::TcpListener,
|
pg_listener: std::net::TcpListener,
|
||||||
allowed_auth_scope: Scope,
|
allowed_auth_scope: Scope,
|
||||||
|
global_timelines: Arc<GlobalTimelines>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
// Tokio's from_std won't do this for us, per its comment.
|
// Tokio's from_std won't do this for us, per its comment.
|
||||||
pg_listener.set_nonblocking(true)?;
|
pg_listener.set_nonblocking(true)?;
|
||||||
@@ -37,10 +39,10 @@ pub async fn task_main(
|
|||||||
debug!("accepted connection from {}", peer_addr);
|
debug!("accepted connection from {}", peer_addr);
|
||||||
let conf = conf.clone();
|
let conf = conf.clone();
|
||||||
let conn_id = issue_connection_id(&mut connection_count);
|
let conn_id = issue_connection_id(&mut connection_count);
|
||||||
|
let global_timelines = global_timelines.clone();
|
||||||
tokio::spawn(
|
tokio::spawn(
|
||||||
async move {
|
async move {
|
||||||
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope).await {
|
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope, global_timelines).await {
|
||||||
error!("connection handler exited: {}", err);
|
error!("connection handler exited: {}", err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -53,9 +55,10 @@ pub async fn task_main(
|
|||||||
///
|
///
|
||||||
async fn handle_socket(
|
async fn handle_socket(
|
||||||
socket: TcpStream,
|
socket: TcpStream,
|
||||||
conf: SafeKeeperConf,
|
conf: Arc<SafeKeeperConf>,
|
||||||
conn_id: ConnectionId,
|
conn_id: ConnectionId,
|
||||||
allowed_auth_scope: Scope,
|
allowed_auth_scope: Scope,
|
||||||
|
global_timelines: Arc<GlobalTimelines>,
|
||||||
) -> Result<(), QueryError> {
|
) -> Result<(), QueryError> {
|
||||||
socket.set_nodelay(true)?;
|
socket.set_nodelay(true)?;
|
||||||
let peer_addr = socket.peer_addr()?;
|
let peer_addr = socket.peer_addr()?;
|
||||||
@@ -96,8 +99,13 @@ async fn handle_socket(
|
|||||||
Some(_) => AuthType::NeonJWT,
|
Some(_) => AuthType::NeonJWT,
|
||||||
};
|
};
|
||||||
let auth_pair = auth_key.map(|key| (allowed_auth_scope, key));
|
let auth_pair = auth_key.map(|key| (allowed_auth_scope, key));
|
||||||
let mut conn_handler =
|
let mut conn_handler = SafekeeperPostgresHandler::new(
|
||||||
SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()), auth_pair);
|
conf,
|
||||||
|
conn_id,
|
||||||
|
Some(traffic_metrics.clone()),
|
||||||
|
auth_pair,
|
||||||
|
global_timelines,
|
||||||
|
);
|
||||||
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
|
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
|
||||||
// libpq protocol between safekeeper and walproposer / pageserver
|
// libpq protocol between safekeeper and walproposer / pageserver
|
||||||
// We don't use shutdown.
|
// We don't use shutdown.
|
||||||
|
|||||||
@@ -636,6 +636,13 @@ impl Persistence {
|
|||||||
.into_boxed(),
|
.into_boxed(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Clear generation_pageserver if we are moving into a state where we won't have
|
||||||
|
// any attached pageservers.
|
||||||
|
let input_generation_pageserver = match input_placement_policy {
|
||||||
|
None | Some(PlacementPolicy::Attached(_)) => None,
|
||||||
|
Some(PlacementPolicy::Detached | PlacementPolicy::Secondary) => Some(None),
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(AsChangeset)]
|
#[derive(AsChangeset)]
|
||||||
#[diesel(table_name = crate::schema::tenant_shards)]
|
#[diesel(table_name = crate::schema::tenant_shards)]
|
||||||
struct ShardUpdate {
|
struct ShardUpdate {
|
||||||
@@ -643,6 +650,7 @@ impl Persistence {
|
|||||||
placement_policy: Option<String>,
|
placement_policy: Option<String>,
|
||||||
config: Option<String>,
|
config: Option<String>,
|
||||||
scheduling_policy: Option<String>,
|
scheduling_policy: Option<String>,
|
||||||
|
generation_pageserver: Option<Option<i64>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
let update = ShardUpdate {
|
let update = ShardUpdate {
|
||||||
@@ -655,6 +663,7 @@ impl Persistence {
|
|||||||
.map(|c| serde_json::to_string(&c).unwrap()),
|
.map(|c| serde_json::to_string(&c).unwrap()),
|
||||||
scheduling_policy: input_scheduling_policy
|
scheduling_policy: input_scheduling_policy
|
||||||
.map(|p| serde_json::to_string(&p).unwrap()),
|
.map(|p| serde_json::to_string(&p).unwrap()),
|
||||||
|
generation_pageserver: input_generation_pageserver,
|
||||||
};
|
};
|
||||||
|
|
||||||
query.set(update).execute(conn)?;
|
query.set(update).execute(conn)?;
|
||||||
|
|||||||
@@ -513,6 +513,9 @@ struct ShardUpdate {
|
|||||||
|
|
||||||
/// If this is None, generation is not updated.
|
/// If this is None, generation is not updated.
|
||||||
generation: Option<Generation>,
|
generation: Option<Generation>,
|
||||||
|
|
||||||
|
/// If this is None, scheduling policy is not updated.
|
||||||
|
scheduling_policy: Option<ShardSchedulingPolicy>,
|
||||||
}
|
}
|
||||||
|
|
||||||
enum StopReconciliationsReason {
|
enum StopReconciliationsReason {
|
||||||
@@ -2376,6 +2379,23 @@ impl Service {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Ordinarily we do not update scheduling policy, but when making major changes
|
||||||
|
// like detaching or demoting to secondary-only, we need to force the scheduling
|
||||||
|
// mode to Active, or the caller's expected outcome (detach it) will not happen.
|
||||||
|
let scheduling_policy = match req.config.mode {
|
||||||
|
LocationConfigMode::Detached | LocationConfigMode::Secondary => {
|
||||||
|
// Special case: when making major changes like detaching or demoting to secondary-only,
|
||||||
|
// we need to force the scheduling mode to Active, or nothing will happen.
|
||||||
|
Some(ShardSchedulingPolicy::Active)
|
||||||
|
}
|
||||||
|
LocationConfigMode::AttachedMulti
|
||||||
|
| LocationConfigMode::AttachedSingle
|
||||||
|
| LocationConfigMode::AttachedStale => {
|
||||||
|
// While attached, continue to respect whatever the existing scheduling mode is.
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let mut create = true;
|
let mut create = true;
|
||||||
for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
|
for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
|
||||||
// Saw an existing shard: this is not a creation
|
// Saw an existing shard: this is not a creation
|
||||||
@@ -2401,6 +2421,7 @@ impl Service {
|
|||||||
placement_policy: placement_policy.clone(),
|
placement_policy: placement_policy.clone(),
|
||||||
tenant_config: req.config.tenant_conf.clone(),
|
tenant_config: req.config.tenant_conf.clone(),
|
||||||
generation: set_generation,
|
generation: set_generation,
|
||||||
|
scheduling_policy,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2497,6 +2518,7 @@ impl Service {
|
|||||||
placement_policy,
|
placement_policy,
|
||||||
tenant_config,
|
tenant_config,
|
||||||
generation,
|
generation,
|
||||||
|
scheduling_policy,
|
||||||
} in &updates
|
} in &updates
|
||||||
{
|
{
|
||||||
self.persistence
|
self.persistence
|
||||||
@@ -2505,7 +2527,7 @@ impl Service {
|
|||||||
Some(placement_policy.clone()),
|
Some(placement_policy.clone()),
|
||||||
Some(tenant_config.clone()),
|
Some(tenant_config.clone()),
|
||||||
*generation,
|
*generation,
|
||||||
None,
|
*scheduling_policy,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
@@ -2521,6 +2543,7 @@ impl Service {
|
|||||||
placement_policy,
|
placement_policy,
|
||||||
tenant_config,
|
tenant_config,
|
||||||
generation: update_generation,
|
generation: update_generation,
|
||||||
|
scheduling_policy,
|
||||||
} in updates
|
} in updates
|
||||||
{
|
{
|
||||||
let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
|
let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
|
||||||
@@ -2539,6 +2562,10 @@ impl Service {
|
|||||||
shard.generation = Some(generation);
|
shard.generation = Some(generation);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(scheduling_policy) = scheduling_policy {
|
||||||
|
shard.set_scheduling_policy(scheduling_policy);
|
||||||
|
}
|
||||||
|
|
||||||
shard.schedule(scheduler, &mut schedule_context)?;
|
shard.schedule(scheduler, &mut schedule_context)?;
|
||||||
|
|
||||||
let maybe_waiter = self.maybe_reconcile_shard(shard, nodes);
|
let maybe_waiter = self.maybe_reconcile_shard(shard, nodes);
|
||||||
@@ -2992,9 +3019,17 @@ impl Service {
|
|||||||
|
|
||||||
let TenantPolicyRequest {
|
let TenantPolicyRequest {
|
||||||
placement,
|
placement,
|
||||||
scheduling,
|
mut scheduling,
|
||||||
} = req;
|
} = req;
|
||||||
|
|
||||||
|
if let Some(PlacementPolicy::Detached | PlacementPolicy::Secondary) = placement {
|
||||||
|
// When someone configures a tenant to detach, we force the scheduling policy to enable
|
||||||
|
// this to take effect.
|
||||||
|
if scheduling.is_none() {
|
||||||
|
scheduling = Some(ShardSchedulingPolicy::Active);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
self.persistence
|
self.persistence
|
||||||
.update_tenant_shard(
|
.update_tenant_shard(
|
||||||
TenantFilter::Tenant(tenant_id),
|
TenantFilter::Tenant(tenant_id),
|
||||||
|
|||||||
21
test_runner/cloud_regress/README.md
Normal file
21
test_runner/cloud_regress/README.md
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
# How to run the `pg_regress` tests on a cloud Neon instance.
|
||||||
|
|
||||||
|
* Create a Neon project on staging.
|
||||||
|
* Grant the superuser privileges to the DB user.
|
||||||
|
* (Optional) create a branch for testing
|
||||||
|
* Configure the endpoint by updating the control-plane database with the following settings:
|
||||||
|
* `Timeone`: `America/Los_Angeles`
|
||||||
|
* `DateStyle`: `Postgres,MDY`
|
||||||
|
* `compute_query_id`: `off`
|
||||||
|
* Checkout the actual `Neon` sources
|
||||||
|
* Patch the sql and expected files for the specific PostgreSQL version, e.g. for v17:
|
||||||
|
```bash
|
||||||
|
$ cd vendor/postgres-v17
|
||||||
|
$ patch -p1 <../../compute/patches/cloud_regress_pg17.patch
|
||||||
|
```
|
||||||
|
* Set the environment variable `BENCHMARK_CONNSTR` to the connection URI of your project.
|
||||||
|
* Set the environment variable `PG_VERSION` to the version of your project.
|
||||||
|
* Run
|
||||||
|
```bash
|
||||||
|
$ pytest -m remote_cluster -k cloud_regress
|
||||||
|
```
|
||||||
@@ -5,68 +5,15 @@ Run the regression tests on the cloud instance of Neon
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
import psycopg2
|
|
||||||
import pytest
|
import pytest
|
||||||
from fixtures.log_helper import log
|
|
||||||
from fixtures.neon_fixtures import RemotePostgres
|
from fixtures.neon_fixtures import RemotePostgres
|
||||||
from fixtures.pg_version import PgVersion
|
from fixtures.pg_version import PgVersion
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def setup(remote_pg: RemotePostgres):
|
|
||||||
"""
|
|
||||||
Setup and teardown of the tests
|
|
||||||
"""
|
|
||||||
with psycopg2.connect(remote_pg.connstr()) as conn:
|
|
||||||
with conn.cursor() as cur:
|
|
||||||
log.info("Creating the extension")
|
|
||||||
cur.execute("CREATE EXTENSION IF NOT EXISTS regress_so")
|
|
||||||
conn.commit()
|
|
||||||
# TODO: Migrate to branches and remove this code
|
|
||||||
log.info("Looking for subscriptions in the regress database")
|
|
||||||
cur.execute(
|
|
||||||
"SELECT subname FROM pg_catalog.pg_subscription WHERE "
|
|
||||||
"subdbid = (SELECT oid FROM pg_catalog.pg_database WHERE datname='regression');"
|
|
||||||
)
|
|
||||||
if cur.rowcount > 0:
|
|
||||||
with psycopg2.connect(
|
|
||||||
dbname="regression",
|
|
||||||
host=remote_pg.default_options["host"],
|
|
||||||
user=remote_pg.default_options["user"],
|
|
||||||
password=remote_pg.default_options["password"],
|
|
||||||
) as regress_conn:
|
|
||||||
with regress_conn.cursor() as regress_cur:
|
|
||||||
for sub in cur:
|
|
||||||
regress_cur.execute(f"ALTER SUBSCRIPTION {sub[0]} DISABLE")
|
|
||||||
regress_cur.execute(
|
|
||||||
f"ALTER SUBSCRIPTION {sub[0]} SET (slot_name = NONE)"
|
|
||||||
)
|
|
||||||
regress_cur.execute(f"DROP SUBSCRIPTION {sub[0]}")
|
|
||||||
regress_conn.commit()
|
|
||||||
|
|
||||||
yield
|
|
||||||
# TODO: Migrate to branches and remove this code
|
|
||||||
log.info("Looking for extra roles...")
|
|
||||||
with psycopg2.connect(remote_pg.connstr()) as conn:
|
|
||||||
with conn.cursor() as cur:
|
|
||||||
cur.execute(
|
|
||||||
"SELECT rolname FROM pg_catalog.pg_roles WHERE oid > 16384 AND rolname <> 'neondb_owner'"
|
|
||||||
)
|
|
||||||
roles: list[Any] = []
|
|
||||||
for role in cur:
|
|
||||||
log.info("Role found: %s", role[0])
|
|
||||||
roles.append(role[0])
|
|
||||||
for role in roles:
|
|
||||||
cur.execute(f"DROP ROLE {role}")
|
|
||||||
conn.commit()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.timeout(7200)
|
@pytest.mark.timeout(7200)
|
||||||
@pytest.mark.remote_cluster
|
@pytest.mark.remote_cluster
|
||||||
def test_cloud_regress(
|
def test_cloud_regress(
|
||||||
setup,
|
|
||||||
remote_pg: RemotePostgres,
|
remote_pg: RemotePostgres,
|
||||||
pg_version: PgVersion,
|
pg_version: PgVersion,
|
||||||
pg_distrib_dir: Path,
|
pg_distrib_dir: Path,
|
||||||
|
|||||||
@@ -175,6 +175,8 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
|
|||||||
counter("pageserver_tenant_throttling_count_accounted_finish"),
|
counter("pageserver_tenant_throttling_count_accounted_finish"),
|
||||||
counter("pageserver_tenant_throttling_wait_usecs_sum"),
|
counter("pageserver_tenant_throttling_wait_usecs_sum"),
|
||||||
counter("pageserver_tenant_throttling_count"),
|
counter("pageserver_tenant_throttling_count"),
|
||||||
|
counter("pageserver_timeline_wal_records_received"),
|
||||||
|
counter("pageserver_page_service_pagestream_flush_in_progress_micros"),
|
||||||
*histogram("pageserver_page_service_batch_size"),
|
*histogram("pageserver_page_service_batch_size"),
|
||||||
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
|
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
|
||||||
# "pageserver_directory_entries_count", -- only used if above a certain threshold
|
# "pageserver_directory_entries_count", -- only used if above a certain threshold
|
||||||
|
|||||||
@@ -54,23 +54,15 @@ def wait_for_upload(
|
|||||||
tenant: TenantId | TenantShardId,
|
tenant: TenantId | TenantShardId,
|
||||||
timeline: TimelineId,
|
timeline: TimelineId,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
|
timeout=20,
|
||||||
):
|
):
|
||||||
"""waits for local timeline upload up to specified lsn"""
|
"""Waits for local timeline upload up to specified LSN"""
|
||||||
|
|
||||||
current_lsn = Lsn(0)
|
def is_uploaded():
|
||||||
for i in range(20):
|
remote_lsn = remote_consistent_lsn(pageserver_http, tenant, timeline)
|
||||||
current_lsn = remote_consistent_lsn(pageserver_http, tenant, timeline)
|
assert remote_lsn >= lsn, f"remote_consistent_lsn at {remote_lsn}"
|
||||||
if current_lsn >= lsn:
|
|
||||||
log.info("wait finished")
|
wait_until(is_uploaded, name=f"upload to {lsn}", timeout=timeout)
|
||||||
return
|
|
||||||
lr_lsn = last_record_lsn(pageserver_http, tenant, timeline)
|
|
||||||
log.info(
|
|
||||||
f"waiting for remote_consistent_lsn to reach {lsn}, now {current_lsn}, last_record_lsn={lr_lsn}, iteration {i + 1}"
|
|
||||||
)
|
|
||||||
time.sleep(1)
|
|
||||||
raise Exception(
|
|
||||||
f"timed out while waiting for {tenant}/{timeline} remote_consistent_lsn to reach {lsn}, was {current_lsn}"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _tenant_in_expected_state(tenant_info: dict[str, Any], expected_state: str):
|
def _tenant_in_expected_state(tenant_info: dict[str, Any], expected_state: str):
|
||||||
|
|||||||
142
test_runner/performance/test_ingest_insert_bulk.py
Normal file
142
test_runner/performance/test_ingest_insert_bulk.py
Normal file
@@ -0,0 +1,142 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import random
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||||
|
from fixtures.common_types import Lsn
|
||||||
|
from fixtures.log_helper import log
|
||||||
|
from fixtures.neon_fixtures import (
|
||||||
|
NeonEnvBuilder,
|
||||||
|
wait_for_last_flush_lsn,
|
||||||
|
)
|
||||||
|
from fixtures.pageserver.utils import (
|
||||||
|
wait_for_last_record_lsn,
|
||||||
|
wait_for_upload,
|
||||||
|
wait_for_upload_queue_empty,
|
||||||
|
)
|
||||||
|
from fixtures.remote_storage import s3_storage
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.timeout(900)
|
||||||
|
@pytest.mark.parametrize("size", [8, 1024, 8192])
|
||||||
|
@pytest.mark.parametrize("s3", [True, False], ids=["s3", "local"])
|
||||||
|
@pytest.mark.parametrize("backpressure", [True, False], ids=["backpressure", "nobackpressure"])
|
||||||
|
@pytest.mark.parametrize("fsync", [True, False], ids=["fsync", "nofsync"])
|
||||||
|
def test_ingest_insert_bulk(
|
||||||
|
request: pytest.FixtureRequest,
|
||||||
|
neon_env_builder: NeonEnvBuilder,
|
||||||
|
zenbenchmark: NeonBenchmarker,
|
||||||
|
fsync: bool,
|
||||||
|
backpressure: bool,
|
||||||
|
s3: bool,
|
||||||
|
size: int,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Benchmarks ingestion of 5 GB of sequential insert WAL. Measures ingestion and S3 upload
|
||||||
|
separately. Also does a Safekeeper→Pageserver re-ingestion to measure Pageserver ingestion in
|
||||||
|
isolation.
|
||||||
|
"""
|
||||||
|
|
||||||
|
CONCURRENCY = 1 # 1 is optimal without fsync or backpressure
|
||||||
|
VOLUME = 5 * 1024**3
|
||||||
|
rows = VOLUME // (size + 64) # +64 roughly accounts for per-row WAL overhead
|
||||||
|
|
||||||
|
neon_env_builder.safekeepers_enable_fsync = fsync
|
||||||
|
|
||||||
|
if s3:
|
||||||
|
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||||
|
# NB: don't use S3 for Safekeeper. It doesn't affect throughput (no backpressure), but it
|
||||||
|
# would compete with Pageserver for bandwidth.
|
||||||
|
# neon_env_builder.enable_safekeeper_remote_storage(s3_storage())
|
||||||
|
|
||||||
|
neon_env_builder.disable_scrub_on_exit() # immediate shutdown may leave stray layers
|
||||||
|
env = neon_env_builder.init_start()
|
||||||
|
|
||||||
|
endpoint = env.endpoints.create_start(
|
||||||
|
"main",
|
||||||
|
config_lines=[
|
||||||
|
f"fsync = {fsync}",
|
||||||
|
"max_replication_apply_lag = 0",
|
||||||
|
f"max_replication_flush_lag = {'10GB' if backpressure else '0'}",
|
||||||
|
# NB: neon_local defaults to 15MB, which is too slow -- production uses 500MB.
|
||||||
|
f"max_replication_write_lag = {'500MB' if backpressure else '0'}",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
endpoint.safe_psql("create extension neon")
|
||||||
|
|
||||||
|
# Wait for the timeline to be propagated to the pageserver.
|
||||||
|
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||||
|
|
||||||
|
# Ingest rows.
|
||||||
|
log.info("Ingesting data")
|
||||||
|
start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||||
|
|
||||||
|
def insert_rows(endpoint, table, count, value):
|
||||||
|
with endpoint.connect().cursor() as cur:
|
||||||
|
cur.execute("set statement_timeout = 0")
|
||||||
|
cur.execute(f"create table {table} (id int, data bytea)")
|
||||||
|
cur.execute(f"insert into {table} values (generate_series(1, {count}), %s)", (value,))
|
||||||
|
|
||||||
|
with zenbenchmark.record_duration("upload"):
|
||||||
|
with zenbenchmark.record_duration("ingest"):
|
||||||
|
with ThreadPoolExecutor(max_workers=CONCURRENCY) as pool:
|
||||||
|
for i in range(CONCURRENCY):
|
||||||
|
# Write a random value for all rows. This is sufficient to prevent compression,
|
||||||
|
# e.g. in TOAST. Randomly generating every row is too slow.
|
||||||
|
value = random.randbytes(size)
|
||||||
|
worker_rows = rows / CONCURRENCY
|
||||||
|
pool.submit(insert_rows, endpoint, f"table{i}", worker_rows, value)
|
||||||
|
|
||||||
|
end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||||
|
|
||||||
|
# Wait for pageserver to ingest the WAL.
|
||||||
|
client = env.pageserver.http_client()
|
||||||
|
wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn)
|
||||||
|
|
||||||
|
# Wait for pageserver S3 upload. Checkpoint to flush the last in-memory layer.
|
||||||
|
client.timeline_checkpoint(
|
||||||
|
env.initial_tenant,
|
||||||
|
env.initial_timeline,
|
||||||
|
compact=False,
|
||||||
|
wait_until_flushed=False,
|
||||||
|
)
|
||||||
|
wait_for_upload(client, env.initial_tenant, env.initial_timeline, end_lsn, timeout=600)
|
||||||
|
|
||||||
|
# Empty out upload queue for next benchmark.
|
||||||
|
wait_for_upload_queue_empty(client, env.initial_tenant, env.initial_timeline)
|
||||||
|
|
||||||
|
backpressure_time = endpoint.safe_psql("select backpressure_throttling_time()")[0][0]
|
||||||
|
|
||||||
|
# Now that all data is ingested, delete and recreate the tenant in the pageserver. This will
|
||||||
|
# reingest all the WAL directly from the safekeeper. This gives us a baseline of how fast the
|
||||||
|
# pageserver can ingest this WAL in isolation.
|
||||||
|
status = env.storage_controller.inspect(tenant_shard_id=env.initial_tenant)
|
||||||
|
assert status is not None
|
||||||
|
|
||||||
|
endpoint.stop() # avoid spurious getpage errors
|
||||||
|
client.tenant_delete(env.initial_tenant)
|
||||||
|
env.pageserver.tenant_create(tenant_id=env.initial_tenant, generation=status[0])
|
||||||
|
|
||||||
|
with zenbenchmark.record_duration("recover"):
|
||||||
|
log.info("Recovering WAL into pageserver")
|
||||||
|
client.timeline_create(env.pg_version, env.initial_tenant, env.initial_timeline)
|
||||||
|
wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn)
|
||||||
|
|
||||||
|
# Emit metrics.
|
||||||
|
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))
|
||||||
|
zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM)
|
||||||
|
zenbenchmark.record("row_count", rows, "rows", MetricReport.TEST_PARAM)
|
||||||
|
zenbenchmark.record("concurrency", CONCURRENCY, "clients", MetricReport.TEST_PARAM)
|
||||||
|
zenbenchmark.record(
|
||||||
|
"backpressure_time", backpressure_time // 1000, "ms", MetricReport.LOWER_IS_BETTER
|
||||||
|
)
|
||||||
|
|
||||||
|
props = {p["name"]: p["value"] for _, p in request.node.user_properties}
|
||||||
|
for name in ("ingest", "upload", "recover"):
|
||||||
|
throughput = int(wal_written_mb / props[name])
|
||||||
|
zenbenchmark.record(f"{name}_throughput", throughput, "MB/s", MetricReport.HIGHER_IS_BETTER)
|
||||||
|
|
||||||
|
# Pageserver shutdown will likely get stuck on the upload queue, just shut it down immediately.
|
||||||
|
env.stop(immediate=True)
|
||||||
@@ -153,19 +153,20 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
|
|||||||
if i % 10 == 0:
|
if i % 10 == 0:
|
||||||
log.info(f"Running churn round {i}/{churn_rounds} ...")
|
log.info(f"Running churn round {i}/{churn_rounds} ...")
|
||||||
|
|
||||||
ps_http.timeline_compact(
|
# Run gc-compaction every 10 rounds to ensure the test doesn't take too long time.
|
||||||
tenant_id,
|
ps_http.timeline_compact(
|
||||||
timeline_id,
|
tenant_id,
|
||||||
enhanced_gc_bottom_most_compaction=True,
|
timeline_id,
|
||||||
body={
|
enhanced_gc_bottom_most_compaction=True,
|
||||||
"scheduled": True,
|
body={
|
||||||
"compact_range": {
|
"scheduled": True,
|
||||||
"start": "000000000000000000000000000000000000",
|
"sub_compaction": True,
|
||||||
# skip the SLRU range for now -- it races with get-lsn-by-timestamp, TODO: fix this
|
"compact_range": {
|
||||||
"end": "010000000000000000000000000000000000",
|
"start": "000000000000000000000000000000000000",
|
||||||
|
"end": "030000000000000000000000000000000000",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
)
|
||||||
)
|
|
||||||
|
|
||||||
workload.churn_rows(row_count, env.pageserver.id)
|
workload.churn_rows(row_count, env.pageserver.id)
|
||||||
|
|
||||||
@@ -177,6 +178,10 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
|
|||||||
log.info("Validating at workload end ...")
|
log.info("Validating at workload end ...")
|
||||||
workload.validate(env.pageserver.id)
|
workload.validate(env.pageserver.id)
|
||||||
|
|
||||||
|
# Run a legacy compaction+gc to ensure gc-compaction can coexist with legacy compaction.
|
||||||
|
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)
|
||||||
|
ps_http.timeline_gc(tenant_id, timeline_id, None)
|
||||||
|
|
||||||
|
|
||||||
# Stripe sizes in number of pages.
|
# Stripe sizes in number of pages.
|
||||||
TINY_STRIPES = 16
|
TINY_STRIPES = 16
|
||||||
|
|||||||
@@ -215,7 +215,7 @@ if SQL_EXPORTER is None:
|
|||||||
#
|
#
|
||||||
# The "host" network mode allows sql_exporter to talk to the
|
# The "host" network mode allows sql_exporter to talk to the
|
||||||
# endpoint which is running on the host.
|
# endpoint which is running on the host.
|
||||||
super().__init__("docker.io/burningalchemist/sql_exporter:0.13.1", network_mode="host")
|
super().__init__("docker.io/burningalchemist/sql_exporter:0.16.0", network_mode="host")
|
||||||
|
|
||||||
self.__logs_dir = logs_dir
|
self.__logs_dir = logs_dir
|
||||||
self.__port = port
|
self.__port = port
|
||||||
|
|||||||
@@ -3230,3 +3230,55 @@ def test_multi_attached_timeline_creation(neon_env_builder: NeonEnvBuilder, migr
|
|||||||
# Always disable 'pause' failpoints, even on failure, to avoid hanging in shutdown
|
# Always disable 'pause' failpoints, even on failure, to avoid hanging in shutdown
|
||||||
env.storage_controller.configure_failpoints((migration_failpoint.value, "off"))
|
env.storage_controller.configure_failpoints((migration_failpoint.value, "off"))
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
@run_only_on_default_postgres("Postgres version makes no difference here")
|
||||||
|
def test_storage_controller_detached_stopped(
|
||||||
|
neon_env_builder: NeonEnvBuilder,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Test that detaching a tenant while it has scheduling policy set to Paused or Stop works
|
||||||
|
"""
|
||||||
|
|
||||||
|
remote_storage_kind = s3_storage()
|
||||||
|
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||||
|
|
||||||
|
neon_env_builder.num_pageservers = 1
|
||||||
|
|
||||||
|
env = neon_env_builder.init_configs()
|
||||||
|
env.start()
|
||||||
|
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||||
|
|
||||||
|
tenant_id = TenantId.generate()
|
||||||
|
env.storage_controller.tenant_create(
|
||||||
|
tenant_id,
|
||||||
|
shard_count=1,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(env.pageserver.http_client().tenant_list_locations()["tenant_shards"]) == 1
|
||||||
|
|
||||||
|
# Disable scheduling: ordinarily this would prevent the tenant's configuration being
|
||||||
|
# reconciled to pageservers, but this should be overridden when detaching.
|
||||||
|
env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy.*")
|
||||||
|
env.storage_controller.tenant_policy_update(
|
||||||
|
tenant_id,
|
||||||
|
{"scheduling": "Stop"},
|
||||||
|
)
|
||||||
|
|
||||||
|
env.storage_controller.consistency_check()
|
||||||
|
|
||||||
|
# Detach the tenant
|
||||||
|
virtual_ps_http.tenant_location_conf(
|
||||||
|
tenant_id,
|
||||||
|
{
|
||||||
|
"mode": "Detached",
|
||||||
|
"secondary_conf": None,
|
||||||
|
"tenant_conf": {},
|
||||||
|
"generation": None,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
env.storage_controller.consistency_check()
|
||||||
|
|
||||||
|
# Confirm the detach happened
|
||||||
|
assert env.pageserver.http_client().tenant_list_locations()["tenant_shards"] == []
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ deranged = { version = "0.3", default-features = false, features = ["powerfmt",
|
|||||||
digest = { version = "0.10", features = ["mac", "oid", "std"] }
|
digest = { version = "0.10", features = ["mac", "oid", "std"] }
|
||||||
either = { version = "1" }
|
either = { version = "1" }
|
||||||
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
||||||
|
form_urlencoded = { version = "1" }
|
||||||
futures-channel = { version = "0.3", features = ["sink"] }
|
futures-channel = { version = "0.3", features = ["sink"] }
|
||||||
futures-executor = { version = "0.3" }
|
futures-executor = { version = "0.3" }
|
||||||
futures-io = { version = "0.3" }
|
futures-io = { version = "0.3" }
|
||||||
@@ -78,6 +79,7 @@ sha2 = { version = "0.10", features = ["asm", "oid"] }
|
|||||||
signature = { version = "2", default-features = false, features = ["digest", "rand_core", "std"] }
|
signature = { version = "2", default-features = false, features = ["digest", "rand_core", "std"] }
|
||||||
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
|
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
|
||||||
spki = { version = "0.7", default-features = false, features = ["pem", "std"] }
|
spki = { version = "0.7", default-features = false, features = ["pem", "std"] }
|
||||||
|
stable_deref_trait = { version = "1" }
|
||||||
subtle = { version = "2" }
|
subtle = { version = "2" }
|
||||||
sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] }
|
sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] }
|
||||||
tikv-jemalloc-ctl = { version = "0.6", features = ["stats", "use_std"] }
|
tikv-jemalloc-ctl = { version = "0.6", features = ["stats", "use_std"] }
|
||||||
@@ -105,6 +107,7 @@ anyhow = { version = "1", features = ["backtrace"] }
|
|||||||
bytes = { version = "1", features = ["serde"] }
|
bytes = { version = "1", features = ["serde"] }
|
||||||
cc = { version = "1", default-features = false, features = ["parallel"] }
|
cc = { version = "1", default-features = false, features = ["parallel"] }
|
||||||
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
|
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
|
||||||
|
displaydoc = { version = "0.2" }
|
||||||
either = { version = "1" }
|
either = { version = "1" }
|
||||||
getrandom = { version = "0.2", default-features = false, features = ["std"] }
|
getrandom = { version = "0.2", default-features = false, features = ["std"] }
|
||||||
half = { version = "2", default-features = false, features = ["num-traits"] }
|
half = { version = "2", default-features = false, features = ["num-traits"] }
|
||||||
|
|||||||
Reference in New Issue
Block a user