mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 10:30:40 +00:00
Compare commits
3 Commits
alexk/comp
...
problame/b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fec3013fcd | ||
|
|
4b04e3b55e | ||
|
|
436178faee |
41
.github/workflows/regenerate-pg-setting.yml
vendored
41
.github/workflows/regenerate-pg-setting.yml
vendored
@@ -1,41 +0,0 @@
|
||||
name: Regenerate Postgres Settings
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types:
|
||||
- opened
|
||||
- synchronize
|
||||
- reopened
|
||||
paths:
|
||||
- pgxn/neon/**.c
|
||||
- vendor/postgres-v*
|
||||
- vendor/revisions.json
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.head_ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
permissions:
|
||||
pull-requests: write
|
||||
|
||||
jobs:
|
||||
regenerate-pg-settings:
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
steps:
|
||||
- name: Add comment
|
||||
uses: thollander/actions-comment-pull-request@v3
|
||||
with:
|
||||
comment-tag: ${{ github.job }}
|
||||
pr-number: ${{ github.event.number }}
|
||||
message: |
|
||||
If this PR added a GUC in the Postgres fork or `neon` extension,
|
||||
please regenerate the Postgres settings in the `cloud` repo:
|
||||
|
||||
```
|
||||
make NEON_WORKDIR=path/to/neon/checkout \
|
||||
-C goapp/internal/shareddomain/postgres generate
|
||||
```
|
||||
|
||||
If you're an external contributor, a Neon employee will assist in
|
||||
making sure this step is done.
|
||||
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1303,7 +1303,6 @@ dependencies = [
|
||||
"aws-config",
|
||||
"aws-sdk-kms",
|
||||
"aws-sdk-s3",
|
||||
"aws-smithy-types",
|
||||
"axum",
|
||||
"base64 0.13.1",
|
||||
"bytes",
|
||||
@@ -1352,7 +1351,6 @@ dependencies = [
|
||||
"utils",
|
||||
"uuid",
|
||||
"vm_monitor",
|
||||
"walkdir",
|
||||
"workspace_hack",
|
||||
"zstd",
|
||||
]
|
||||
|
||||
@@ -1695,6 +1695,29 @@ RUN if [ "$TARGETARCH" = "amd64" ]; then\
|
||||
&& echo "${pgbouncer_exporter_sha256} pgbouncer_exporter" | sha256sum -c -\
|
||||
&& echo "${sql_exporter_sha256} sql_exporter" | sha256sum -c -
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "awscli"
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS awscli
|
||||
ARG TARGETARCH
|
||||
RUN set -ex; \
|
||||
if [ "${TARGETARCH}" = "amd64" ]; then \
|
||||
TARGETARCH_ALT="x86_64"; \
|
||||
CHECKSUM="c9a9df3770a3ff9259cb469b6179e02829687a464e0824d5c32d378820b53a00"; \
|
||||
elif [ "${TARGETARCH}" = "arm64" ]; then \
|
||||
TARGETARCH_ALT="aarch64"; \
|
||||
CHECKSUM="8181730be7891582b38b028112e81b4899ca817e8c616aad807c9e9d1289223a"; \
|
||||
else \
|
||||
echo "Unsupported architecture: ${TARGETARCH}"; exit 1; \
|
||||
fi; \
|
||||
curl --retry 5 -L "https://awscli.amazonaws.com/awscli-exe-linux-${TARGETARCH_ALT}-2.17.5.zip" -o /tmp/awscliv2.zip; \
|
||||
echo "${CHECKSUM} /tmp/awscliv2.zip" | sha256sum -c -; \
|
||||
unzip /tmp/awscliv2.zip -d /tmp/awscliv2; \
|
||||
/tmp/awscliv2/aws/install; \
|
||||
rm -rf /tmp/awscliv2.zip /tmp/awscliv2
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Clean up postgres folder before inclusion
|
||||
@@ -1864,6 +1887,9 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
|
||||
mkdir /usr/local/download_extensions && \
|
||||
chown -R postgres:postgres /usr/local/download_extensions
|
||||
|
||||
# aws cli is used by fast_import
|
||||
COPY --from=awscli /usr/local/aws-cli /usr/local/aws-cli
|
||||
|
||||
# pgbouncer and its config
|
||||
COPY --from=pgbouncer /usr/local/pgbouncer/bin/pgbouncer /usr/local/bin/pgbouncer
|
||||
COPY --chmod=0666 --chown=postgres compute/etc/pgbouncer.ini /etc/pgbouncer.ini
|
||||
|
||||
@@ -47,9 +47,7 @@ files:
|
||||
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
|
||||
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
|
||||
# regardless of hostname (ALL)
|
||||
#
|
||||
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
|
||||
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff
|
||||
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota
|
||||
- filename: cgconfig.conf
|
||||
content: |
|
||||
# Configuration for cgroups in VM compute nodes
|
||||
|
||||
@@ -14,7 +14,6 @@ base64.workspace = true
|
||||
aws-config.workspace = true
|
||||
aws-sdk-s3.workspace = true
|
||||
aws-sdk-kms.workspace = true
|
||||
aws-smithy-types.workspace = true
|
||||
anyhow.workspace = true
|
||||
axum = { workspace = true, features = [] }
|
||||
camino.workspace = true
|
||||
@@ -55,7 +54,6 @@ thiserror.workspace = true
|
||||
url.workspace = true
|
||||
uuid.workspace = true
|
||||
prometheus.workspace = true
|
||||
walkdir.workspace = true
|
||||
|
||||
postgres_initdb.workspace = true
|
||||
compute_api.workspace = true
|
||||
|
||||
@@ -421,7 +421,6 @@ async fn run_dump_restore(
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn cmd_pgdata(
|
||||
s3_client: Option<aws_sdk_s3::Client>,
|
||||
kms_client: Option<aws_sdk_kms::Client>,
|
||||
maybe_s3_prefix: Option<s3_uri::S3Uri>,
|
||||
maybe_spec: Option<Spec>,
|
||||
@@ -489,13 +488,9 @@ async fn cmd_pgdata(
|
||||
// Only sync if s3_prefix was specified
|
||||
if let Some(s3_prefix) = maybe_s3_prefix {
|
||||
info!("upload pgdata");
|
||||
aws_s3_sync::upload_dir_recursive(
|
||||
s3_client.as_ref().unwrap(),
|
||||
Utf8Path::new(&pgdata_dir),
|
||||
&s3_prefix.append("/pgdata/"),
|
||||
)
|
||||
.await
|
||||
.context("sync dump directory to destination")?;
|
||||
aws_s3_sync::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/pgdata/"))
|
||||
.await
|
||||
.context("sync dump directory to destination")?;
|
||||
|
||||
info!("write status");
|
||||
{
|
||||
@@ -504,13 +499,9 @@ async fn cmd_pgdata(
|
||||
let status_file = status_dir.join("pgdata");
|
||||
std::fs::write(&status_file, serde_json::json!({"done": true}).to_string())
|
||||
.context("write status file")?;
|
||||
aws_s3_sync::upload_dir_recursive(
|
||||
s3_client.as_ref().unwrap(),
|
||||
&status_dir,
|
||||
&s3_prefix.append("/status/"),
|
||||
)
|
||||
.await
|
||||
.context("sync status directory to destination")?;
|
||||
aws_s3_sync::sync(&status_dir, &s3_prefix.append("/status/"))
|
||||
.await
|
||||
.context("sync status directory to destination")?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -582,20 +573,18 @@ pub(crate) async fn main() -> anyhow::Result<()> {
|
||||
let args = Args::parse();
|
||||
|
||||
// Initialize AWS clients only if s3_prefix is specified
|
||||
let (s3_client, kms_client) = if args.s3_prefix.is_some() {
|
||||
let (aws_config, kms_client) = if args.s3_prefix.is_some() {
|
||||
let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await;
|
||||
let s3_client = aws_sdk_s3::Client::new(&config);
|
||||
let kms = aws_sdk_kms::Client::new(&config);
|
||||
(Some(s3_client), Some(kms))
|
||||
(Some(config), Some(kms))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
let spec: Option<Spec> = if let Some(s3_prefix) = &args.s3_prefix {
|
||||
let spec_key = s3_prefix.append("/spec.json");
|
||||
let s3_client = aws_sdk_s3::Client::new(aws_config.as_ref().unwrap());
|
||||
let object = s3_client
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.get_object()
|
||||
.bucket(&spec_key.bucket)
|
||||
.key(spec_key.key)
|
||||
@@ -635,7 +624,6 @@ pub(crate) async fn main() -> anyhow::Result<()> {
|
||||
memory_mb,
|
||||
} => {
|
||||
cmd_pgdata(
|
||||
s3_client,
|
||||
kms_client,
|
||||
args.s3_prefix,
|
||||
spec,
|
||||
|
||||
@@ -1,102 +1,24 @@
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use tokio::task::JoinSet;
|
||||
use walkdir::WalkDir;
|
||||
use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
|
||||
use super::s3_uri::S3Uri;
|
||||
|
||||
use tracing::{info, warn};
|
||||
|
||||
const MAX_PARALLEL_UPLOADS: usize = 10;
|
||||
|
||||
/// Upload all files from 'local' to 'remote'
|
||||
pub(crate) async fn upload_dir_recursive(
|
||||
s3_client: &aws_sdk_s3::Client,
|
||||
local: &Utf8Path,
|
||||
remote: &S3Uri,
|
||||
) -> anyhow::Result<()> {
|
||||
// Recursively scan directory
|
||||
let mut dirwalker = WalkDir::new(local)
|
||||
.into_iter()
|
||||
.map(|entry| {
|
||||
let entry = entry?;
|
||||
let file_type = entry.file_type();
|
||||
let path = <&Utf8Path>::try_from(entry.path())?.to_path_buf();
|
||||
Ok((file_type, path))
|
||||
})
|
||||
.filter_map(|e: anyhow::Result<(std::fs::FileType, Utf8PathBuf)>| {
|
||||
match e {
|
||||
Ok((file_type, path)) if file_type.is_file() => Some(Ok(path)),
|
||||
Ok((file_type, _path)) if file_type.is_dir() => {
|
||||
// The WalkDir iterator will recurse into directories, but we don't want
|
||||
// to do anything with directories as such. There's no concept of uploading
|
||||
// an empty directory to S3.
|
||||
None
|
||||
}
|
||||
Ok((file_type, path)) if file_type.is_symlink() => {
|
||||
// huh, didn't expect a symlink. Can't upload that to S3. Warn and skip.
|
||||
warn!("cannot upload symlink ({})", path);
|
||||
None
|
||||
}
|
||||
Ok((_file_type, path)) => {
|
||||
// should not happen
|
||||
warn!("directory entry has unexpected type ({})", path);
|
||||
None
|
||||
}
|
||||
Err(e) => Some(Err(e)),
|
||||
}
|
||||
});
|
||||
|
||||
// Spawn upload tasks for each file, keeping MAX_PARALLEL_UPLOADS active in
|
||||
// parallel.
|
||||
let mut joinset = JoinSet::new();
|
||||
loop {
|
||||
// Could we upload more?
|
||||
while joinset.len() < MAX_PARALLEL_UPLOADS {
|
||||
if let Some(full_local_path) = dirwalker.next() {
|
||||
let full_local_path = full_local_path?;
|
||||
let relative_local_path = full_local_path
|
||||
.strip_prefix(local)
|
||||
.expect("all paths start from the walkdir root");
|
||||
let remote_path = remote.append(relative_local_path.as_str());
|
||||
info!(
|
||||
"starting upload of {} to {}",
|
||||
&full_local_path, &remote_path
|
||||
);
|
||||
let upload_task = upload_file(s3_client.clone(), full_local_path, remote_path);
|
||||
joinset.spawn(upload_task);
|
||||
} else {
|
||||
info!("draining upload tasks");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for an upload to complete
|
||||
if let Some(res) = joinset.join_next().await {
|
||||
let _ = res?;
|
||||
} else {
|
||||
// all done!
|
||||
break;
|
||||
}
|
||||
pub(crate) async fn sync(local: &Utf8Path, remote: &S3Uri) -> anyhow::Result<()> {
|
||||
let mut builder = tokio::process::Command::new("aws");
|
||||
builder
|
||||
.arg("s3")
|
||||
.arg("sync")
|
||||
.arg(local.as_str())
|
||||
.arg(remote.to_string());
|
||||
let st = builder
|
||||
.spawn()
|
||||
.context("spawn aws s3 sync")?
|
||||
.wait()
|
||||
.await
|
||||
.context("wait for aws s3 sync")?;
|
||||
if st.success() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow::anyhow!("aws s3 sync failed"))
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn upload_file(
|
||||
s3_client: aws_sdk_s3::Client,
|
||||
local_path: Utf8PathBuf,
|
||||
remote: S3Uri,
|
||||
) -> anyhow::Result<()> {
|
||||
use aws_smithy_types::byte_stream::ByteStream;
|
||||
let stream = ByteStream::from_path(&local_path).await?;
|
||||
|
||||
let _result = s3_client
|
||||
.put_object()
|
||||
.bucket(remote.bucket)
|
||||
.key(&remote.key)
|
||||
.body(stream)
|
||||
.send()
|
||||
.await?;
|
||||
info!("upload of {} to {} finished", &local_path, &remote.key);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ use std::{
|
||||
|
||||
use anyhow::Result;
|
||||
use axum::{
|
||||
body::Body,
|
||||
extract::Request,
|
||||
middleware::{self, Next},
|
||||
response::{IntoResponse, Response},
|
||||
@@ -17,7 +16,6 @@ use axum::{
|
||||
use http::StatusCode;
|
||||
use tokio::net::TcpListener;
|
||||
use tower::ServiceBuilder;
|
||||
use tower_http::classify::ServerErrorsFailureClass;
|
||||
use tower_http::{request_id::PropagateRequestIdLayer, trace::TraceLayer};
|
||||
use tracing::{debug, error, info, Span};
|
||||
use uuid::Uuid;
|
||||
@@ -86,85 +84,46 @@ impl From<Server> for Router<Arc<ComputeNode>> {
|
||||
.route("/terminate", post(terminate::terminate)),
|
||||
};
|
||||
|
||||
router
|
||||
.fallback(Server::handle_404)
|
||||
.method_not_allowed_fallback(Server::handle_405)
|
||||
.layer(
|
||||
ServiceBuilder::new()
|
||||
// Add this middleware since we assume the request ID exists
|
||||
.layer(middleware::from_fn(maybe_add_request_id_header))
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
.make_span_with(|request: &Request<Body>| {
|
||||
let request_id = request
|
||||
router.fallback(Server::handle_404).method_not_allowed_fallback(Server::handle_405).layer(
|
||||
ServiceBuilder::new()
|
||||
// Add this middleware since we assume the request ID exists
|
||||
.layer(middleware::from_fn(maybe_add_request_id_header))
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
.on_request(|request: &http::Request<_>, _span: &Span| {
|
||||
let request_id = request
|
||||
.headers()
|
||||
.get(X_REQUEST_ID)
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap();
|
||||
|
||||
match request.uri().path() {
|
||||
"/metrics" => {
|
||||
debug!(%request_id, "{} {}", request.method(), request.uri())
|
||||
}
|
||||
_ => info!(%request_id, "{} {}", request.method(), request.uri()),
|
||||
};
|
||||
})
|
||||
.on_response(
|
||||
|response: &http::Response<_>, latency: Duration, _span: &Span| {
|
||||
let request_id = response
|
||||
.headers()
|
||||
.get(X_REQUEST_ID)
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap();
|
||||
|
||||
match request.uri().path() {
|
||||
"/metrics" => {
|
||||
tracing::span!(
|
||||
tracing::Level::DEBUG,
|
||||
"",
|
||||
method = tracing::field::display(request.method()),
|
||||
uri = tracing::field::display(request.uri()),
|
||||
request_id = tracing::field::display(request_id)
|
||||
)
|
||||
}
|
||||
_ => tracing::span!(
|
||||
tracing::Level::INFO,
|
||||
"",
|
||||
method = tracing::field::display(request.method()),
|
||||
uri = tracing::field::display(request.uri()),
|
||||
request_id = tracing::field::display(request_id)
|
||||
),
|
||||
}
|
||||
})
|
||||
.on_request(|request: &http::Request<_>, _span: &Span| {
|
||||
match request.uri().path() {
|
||||
"/metrics" => debug!("incoming request"),
|
||||
_ => info!("incoming request"),
|
||||
};
|
||||
})
|
||||
.on_response(
|
||||
|response: &http::Response<_>, latency: Duration, _span: &Span| {
|
||||
// All errors will be logged in the on_failure handler
|
||||
if let 200..=399 = response.status().as_u16() {
|
||||
info!(
|
||||
message = "request finished",
|
||||
code = %response.status().as_u16(),
|
||||
latency_ms = %latency.as_millis()
|
||||
)
|
||||
}
|
||||
},
|
||||
)
|
||||
.on_failure(
|
||||
|error: ServerErrorsFailureClass,
|
||||
latency: Duration,
|
||||
_span: &Span| {
|
||||
match error {
|
||||
ServerErrorsFailureClass::StatusCode(code) => {
|
||||
error!(
|
||||
message = "request failed",
|
||||
code = %code,
|
||||
latency_ms = %latency.as_millis()
|
||||
);
|
||||
}
|
||||
ServerErrorsFailureClass::Error(error) => {
|
||||
error!(
|
||||
message = "request failed unexpectedly",
|
||||
error = %error,
|
||||
latency_ms = %latency.as_millis()
|
||||
);
|
||||
}
|
||||
}
|
||||
},
|
||||
),
|
||||
)
|
||||
.layer(PropagateRequestIdLayer::x_request_id()),
|
||||
)
|
||||
info!(
|
||||
%request_id,
|
||||
code = response.status().as_u16(),
|
||||
latency = latency.as_millis()
|
||||
)
|
||||
},
|
||||
),
|
||||
)
|
||||
.layer(PropagateRequestIdLayer::x_request_id()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -202,7 +202,6 @@ pub struct TimelineStatus {
|
||||
pub commit_lsn: Lsn,
|
||||
pub backup_lsn: Lsn,
|
||||
pub peer_horizon_lsn: Lsn,
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
pub peers: Vec<PeerInfo>,
|
||||
pub walsenders: Vec<WalSenderState>,
|
||||
pub walreceivers: Vec<WalReceiverState>,
|
||||
|
||||
@@ -5,7 +5,7 @@ use pq_proto::{read_cstr, PG_EPOCH};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::{trace, warn};
|
||||
|
||||
use crate::lsn::Lsn;
|
||||
use crate::{generation::Generation, lsn::Lsn};
|
||||
|
||||
/// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
|
||||
///
|
||||
@@ -32,6 +32,12 @@ pub struct PageserverFeedback {
|
||||
pub replytime: SystemTime,
|
||||
/// Used to track feedbacks from different shards. Always zero for unsharded tenants.
|
||||
pub shard_number: u32,
|
||||
/// The shard's pageserver-side generation number.
|
||||
/// Used to track `remote_consistent_lsn` by generation which is required
|
||||
/// to determine whether
|
||||
/// - WAL offers still need to be sent
|
||||
/// - in future: whether WAL can be evicted and/or pruned
|
||||
pub generation: Generation,
|
||||
}
|
||||
|
||||
impl PageserverFeedback {
|
||||
@@ -43,6 +49,7 @@ impl PageserverFeedback {
|
||||
disk_consistent_lsn: Lsn::INVALID,
|
||||
replytime: *PG_EPOCH,
|
||||
shard_number: 0,
|
||||
generation: Generation::none(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,6 +108,8 @@ impl PageserverFeedback {
|
||||
buf.put_u32(self.shard_number);
|
||||
}
|
||||
|
||||
todo!("ps_generation");
|
||||
|
||||
buf[buf_ptr] = nkeys;
|
||||
}
|
||||
|
||||
@@ -147,6 +156,9 @@ impl PageserverFeedback {
|
||||
assert_eq!(len, 4);
|
||||
rf.shard_number = buf.get_u32();
|
||||
}
|
||||
b"ps_generation" => {
|
||||
todo!();
|
||||
}
|
||||
_ => {
|
||||
let len = buf.get_i32();
|
||||
warn!(
|
||||
|
||||
@@ -266,7 +266,6 @@ components:
|
||||
- flush_lsn
|
||||
- commit_lsn
|
||||
- backup_lsn
|
||||
- remote_consistent_lsn
|
||||
- peer_horizon_lsn
|
||||
- safekeeper_connstr
|
||||
properties:
|
||||
@@ -279,8 +278,6 @@ components:
|
||||
type: string
|
||||
backup_lsn:
|
||||
type: string
|
||||
remote_consistent_lsn:
|
||||
type: string
|
||||
peer_horizon_lsn:
|
||||
type: string
|
||||
safekeeper_connstr:
|
||||
@@ -325,8 +322,6 @@ components:
|
||||
type: string
|
||||
peer_horizon_lsn:
|
||||
type: string
|
||||
remote_consistent_lsn:
|
||||
type: string
|
||||
|
||||
AcceptorStateStatus:
|
||||
type: object
|
||||
|
||||
@@ -199,7 +199,6 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
commit_lsn: inmem.commit_lsn,
|
||||
backup_lsn: inmem.backup_lsn,
|
||||
peer_horizon_lsn: inmem.peer_horizon_lsn,
|
||||
remote_consistent_lsn: inmem.remote_consistent_lsn,
|
||||
peers: tli.get_peers(conf).await,
|
||||
walsenders: tli.get_walsenders().get_all_public(),
|
||||
walreceivers: tli.get_walreceivers().get_all(),
|
||||
@@ -456,7 +455,6 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
|
||||
last_log_term: sk_info.last_log_term.unwrap_or(0),
|
||||
flush_lsn: sk_info.flush_lsn.0,
|
||||
commit_lsn: sk_info.commit_lsn.0,
|
||||
remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
|
||||
peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
|
||||
safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
|
||||
|
||||
@@ -471,7 +471,6 @@ pub struct TimelineCollector {
|
||||
flush_lsn: GenericGaugeVec<AtomicU64>,
|
||||
epoch_start_lsn: GenericGaugeVec<AtomicU64>,
|
||||
peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
|
||||
remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
|
||||
ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
|
||||
feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
|
||||
ps_feedback_count: GenericGaugeVec<AtomicU64>,
|
||||
@@ -543,16 +542,6 @@ impl TimelineCollector {
|
||||
.unwrap();
|
||||
descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
|
||||
|
||||
let remote_consistent_lsn = GenericGaugeVec::new(
|
||||
Opts::new(
|
||||
"safekeeper_remote_consistent_lsn",
|
||||
"LSN which is persisted to the remote storage in pageserver",
|
||||
),
|
||||
&["tenant_id", "timeline_id"],
|
||||
)
|
||||
.unwrap();
|
||||
descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
|
||||
|
||||
let ps_last_received_lsn = GenericGaugeVec::new(
|
||||
Opts::new(
|
||||
"safekeeper_ps_last_received_lsn",
|
||||
@@ -698,7 +687,6 @@ impl TimelineCollector {
|
||||
flush_lsn,
|
||||
epoch_start_lsn,
|
||||
peer_horizon_lsn,
|
||||
remote_consistent_lsn,
|
||||
ps_last_received_lsn,
|
||||
feedback_last_time_seconds,
|
||||
ps_feedback_count,
|
||||
@@ -732,7 +720,6 @@ impl Collector for TimelineCollector {
|
||||
self.flush_lsn.reset();
|
||||
self.epoch_start_lsn.reset();
|
||||
self.peer_horizon_lsn.reset();
|
||||
self.remote_consistent_lsn.reset();
|
||||
self.ps_last_received_lsn.reset();
|
||||
self.feedback_last_time_seconds.reset();
|
||||
self.ps_feedback_count.reset();
|
||||
@@ -786,9 +773,6 @@ impl Collector for TimelineCollector {
|
||||
self.peer_horizon_lsn
|
||||
.with_label_values(labels)
|
||||
.set(tli.mem_state.peer_horizon_lsn.into());
|
||||
self.remote_consistent_lsn
|
||||
.with_label_values(labels)
|
||||
.set(tli.mem_state.remote_consistent_lsn.into());
|
||||
self.timeline_active
|
||||
.with_label_values(labels)
|
||||
.set(tli.timeline_is_active as u64);
|
||||
@@ -849,7 +833,6 @@ impl Collector for TimelineCollector {
|
||||
mfs.extend(self.flush_lsn.collect());
|
||||
mfs.extend(self.epoch_start_lsn.collect());
|
||||
mfs.extend(self.peer_horizon_lsn.collect());
|
||||
mfs.extend(self.remote_consistent_lsn.collect());
|
||||
mfs.extend(self.ps_last_received_lsn.collect());
|
||||
mfs.extend(self.feedback_last_time_seconds.collect());
|
||||
mfs.extend(self.ps_feedback_count.collect());
|
||||
|
||||
@@ -308,10 +308,8 @@ impl WalResidentTimeline {
|
||||
// removed further than `backup_lsn`. Since we're holding shared_state
|
||||
// lock and setting `wal_removal_on_hold` later, it guarantees that WAL
|
||||
// won't be removed until we're done.
|
||||
let from_lsn = min(
|
||||
shared_state.sk.state().remote_consistent_lsn,
|
||||
shared_state.sk.state().backup_lsn,
|
||||
);
|
||||
// TODO: do we still need this snapshot code path?
|
||||
let from_lsn = shared_state.sk.state().backup_lsn;
|
||||
if from_lsn == Lsn::INVALID {
|
||||
// this is possible if snapshot is called before handling first
|
||||
// elected message
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::timeline_manager::StateSnapshot;
|
||||
/// Get oldest LSN we still need to keep.
|
||||
///
|
||||
/// We hold WAL till it is consumed by
|
||||
/// 1) pageserver (remote_consistent_lsn)
|
||||
/// 1) pageserver (min_remote_consistent_lsn)
|
||||
/// 2) s3 offloading.
|
||||
/// 3) Additionally we must store WAL since last local commit_lsn because
|
||||
/// that's where we start looking for last WAL record on start.
|
||||
@@ -17,7 +17,7 @@ use crate::timeline_manager::StateSnapshot;
|
||||
pub(crate) fn calc_horizon_lsn(state: &StateSnapshot, extra_horizon_lsn: Option<Lsn>) -> Lsn {
|
||||
use std::cmp::min;
|
||||
|
||||
let mut horizon_lsn = state.cfile_remote_consistent_lsn;
|
||||
let mut horizon_lsn = state.min_remote_consistent_lsn;
|
||||
// we don't want to remove WAL that is not yet offloaded to s3
|
||||
horizon_lsn = min(horizon_lsn, state.cfile_backup_lsn);
|
||||
// Min by local commit_lsn to be able to begin reading WAL from somewhere on
|
||||
|
||||
@@ -560,19 +560,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
|
||||
// Send a periodic keep alive when the connection has been idle for a while.
|
||||
// Since we've been idle, also check if we can stop streaming.
|
||||
_ = keepalive_ticker.tick() => {
|
||||
if let Some(remote_consistent_lsn) = self.wal_sender_guard
|
||||
.walsenders()
|
||||
.get_ws_remote_consistent_lsn(self.wal_sender_guard.id())
|
||||
{
|
||||
if self.tli.should_walsender_stop(remote_consistent_lsn).await {
|
||||
// Stop streaming if the receivers are caught up and
|
||||
// there's no active compute. This causes the loop in
|
||||
// [`crate::send_interpreted_wal::InterpretedWalSender::run`]
|
||||
// to exit and terminate the WAL stream.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
self.pgb
|
||||
.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
|
||||
wal_end: self.end_watch_view.get().0,
|
||||
|
||||
@@ -251,17 +251,6 @@ impl WalSenders {
|
||||
shared.update_reply_feedback();
|
||||
}
|
||||
|
||||
/// Get remote_consistent_lsn reported by the pageserver. Returns None if
|
||||
/// client is not pageserver.
|
||||
pub fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
|
||||
let shared = self.mutex.lock();
|
||||
let slot = shared.get_slot(id);
|
||||
match slot.get_feedback() {
|
||||
ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Unregister walsender.
|
||||
fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
|
||||
let mut shared = self.mutex.lock();
|
||||
@@ -890,28 +879,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Timed out waiting for WAL, check for termination and send KA.
|
||||
// Check for termination only if we are streaming up to commit_lsn
|
||||
// (to pageserver).
|
||||
if let EndWatch::Commit(_) = self.end_watch {
|
||||
if let Some(remote_consistent_lsn) = self
|
||||
.ws_guard
|
||||
.walsenders
|
||||
.get_ws_remote_consistent_lsn(self.ws_guard.id)
|
||||
{
|
||||
if self.tli.should_walsender_stop(remote_consistent_lsn).await {
|
||||
// Terminate if there is nothing more to send.
|
||||
// Note that "ending streaming" part of the string is used by
|
||||
// pageserver to identify WalReceiverError::SuccessfulCompletion,
|
||||
// do not change this string without updating pageserver.
|
||||
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
|
||||
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
|
||||
self.appname, self.start_pos,
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let msg = BeMessage::KeepAlive(WalSndKeepAlive {
|
||||
wal_end: self.end_pos.0,
|
||||
timestamp: get_current_timestamp(),
|
||||
@@ -1020,7 +987,10 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
|
||||
.walsenders
|
||||
.record_ps_feedback(self.ws_guard.id, &ps_feedback);
|
||||
self.tli
|
||||
.update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
|
||||
.process_remote_consistent_lsn_update(
|
||||
ps_feedback.generation,
|
||||
ps_feedback.remote_consistent_lsn,
|
||||
)
|
||||
.await;
|
||||
// in principle new remote_consistent_lsn could allow to
|
||||
// deactivate the timeline, but we check that regularly through
|
||||
|
||||
@@ -61,10 +61,9 @@ pub struct TimelinePersistentState {
|
||||
/// walproposer proto called 'truncate_lsn'. Updates are currently drived
|
||||
/// only by walproposer.
|
||||
pub peer_horizon_lsn: Lsn,
|
||||
/// LSN of the oldest known checkpoint made by pageserver and successfully
|
||||
/// pushed to s3. We don't remove WAL beyond it. Persisted only for
|
||||
/// informational purposes, we receive it from pageserver (or broker).
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
/// Obsolete; nowadays we track remote_consistent_lsn by generation number
|
||||
/// in a separate cache with relaxed persistency requirements.
|
||||
remote_consistent_lsn: Lsn,
|
||||
/// Holds names of partial segments uploaded to remote storage. Used to
|
||||
/// clean up old objects without leaving garbage in remote storage.
|
||||
pub partial_backup: wal_backup_partial::State,
|
||||
@@ -171,7 +170,6 @@ pub struct TimelineMemState {
|
||||
pub commit_lsn: Lsn,
|
||||
pub backup_lsn: Lsn,
|
||||
pub peer_horizon_lsn: Lsn,
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
#[serde(with = "hex")]
|
||||
pub proposer_uuid: PgUuid,
|
||||
}
|
||||
@@ -198,7 +196,6 @@ where
|
||||
commit_lsn: state.commit_lsn,
|
||||
backup_lsn: state.backup_lsn,
|
||||
peer_horizon_lsn: state.peer_horizon_lsn,
|
||||
remote_consistent_lsn: state.remote_consistent_lsn,
|
||||
proposer_uuid: state.proposer_uuid,
|
||||
},
|
||||
pers: state,
|
||||
@@ -213,7 +210,6 @@ where
|
||||
s.commit_lsn = self.inmem.commit_lsn;
|
||||
s.backup_lsn = self.inmem.backup_lsn;
|
||||
s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
|
||||
s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
|
||||
s.proposer_uuid = self.inmem.proposer_uuid;
|
||||
s
|
||||
}
|
||||
@@ -230,7 +226,6 @@ where
|
||||
self.inmem.commit_lsn = s.commit_lsn;
|
||||
self.inmem.backup_lsn = s.backup_lsn;
|
||||
self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
|
||||
self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
|
||||
self.inmem.proposer_uuid = s.proposer_uuid;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ use safekeeper_api::models::{
|
||||
use safekeeper_api::Term;
|
||||
use tokio::fs::{self};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TenantId;
|
||||
use utils::sync::gate::Gate;
|
||||
|
||||
@@ -214,7 +215,7 @@ impl StateSK {
|
||||
StateSK::Empty => unreachable!(),
|
||||
}
|
||||
|
||||
// update everything else, including remote_consistent_lsn and backup_lsn
|
||||
// update everything else, including backup_lsn
|
||||
let mut sync_control_file = false;
|
||||
let state = self.state_mut();
|
||||
let wal_seg_size = state.server.wal_seg_size as u64;
|
||||
@@ -222,13 +223,6 @@ impl StateSK {
|
||||
state.inmem.backup_lsn = max(Lsn(sk_info.backup_lsn), state.inmem.backup_lsn);
|
||||
sync_control_file |= state.backup_lsn + wal_seg_size < state.inmem.backup_lsn;
|
||||
|
||||
state.inmem.remote_consistent_lsn = max(
|
||||
Lsn(sk_info.remote_consistent_lsn),
|
||||
state.inmem.remote_consistent_lsn,
|
||||
);
|
||||
sync_control_file |=
|
||||
state.remote_consistent_lsn + wal_seg_size < state.inmem.remote_consistent_lsn;
|
||||
|
||||
state.inmem.peer_horizon_lsn =
|
||||
max(Lsn(sk_info.peer_horizon_lsn), state.inmem.peer_horizon_lsn);
|
||||
sync_control_file |= state.peer_horizon_lsn + wal_seg_size < state.inmem.peer_horizon_lsn;
|
||||
@@ -364,7 +358,6 @@ impl SharedState {
|
||||
flush_lsn: self.sk.flush_lsn().0,
|
||||
// note: this value is not flushed to control file yet and can be lost
|
||||
commit_lsn: self.sk.state().inmem.commit_lsn.0,
|
||||
remote_consistent_lsn: self.sk.state().inmem.remote_consistent_lsn.0,
|
||||
peer_horizon_lsn: self.sk.state().inmem.peer_horizon_lsn.0,
|
||||
safekeeper_connstr: conf
|
||||
.advertise_pg_addr
|
||||
@@ -880,6 +873,16 @@ impl Timeline {
|
||||
pub async fn backup_partial_reset(self: &Arc<Self>) -> Result<Vec<String>> {
|
||||
self.manager_ctl.backup_partial_reset().await
|
||||
}
|
||||
|
||||
pub async fn process_remote_consistent_lsn_update(
|
||||
&self,
|
||||
generation: Generation,
|
||||
candidate: Lsn,
|
||||
) {
|
||||
// TODO: still update controlfile state for backwards compate
|
||||
|
||||
todo!("implement & use the remote_persistent_lsn cache")
|
||||
}
|
||||
}
|
||||
|
||||
/// This is a guard that allows to read/write disk timeline state.
|
||||
@@ -904,23 +907,6 @@ impl Deref for WalResidentTimeline {
|
||||
}
|
||||
|
||||
impl WalResidentTimeline {
|
||||
/// Returns true if walsender should stop sending WAL to pageserver. We
|
||||
/// terminate it if remote_consistent_lsn reached commit_lsn and there is no
|
||||
/// computes. While there might be nothing to stream already, we learn about
|
||||
/// remote_consistent_lsn update through replication feedback, and we want
|
||||
/// to stop pushing to the broker if pageserver is fully caughtup.
|
||||
pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
|
||||
if self.is_cancelled() {
|
||||
return true;
|
||||
}
|
||||
let shared_state = self.read_shared_state().await;
|
||||
if self.walreceivers.get_num() == 0 {
|
||||
return shared_state.sk.state().inmem.commit_lsn == Lsn(0) || // no data at all yet
|
||||
reported_remote_consistent_lsn >= shared_state.sk.state().inmem.commit_lsn;
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Ensure that current term is t, erroring otherwise, and lock the state.
|
||||
pub async fn acquire_term(&self, t: Term) -> Result<ReadGuardSharedState> {
|
||||
let ss = self.read_shared_state().await;
|
||||
@@ -972,15 +958,6 @@ impl WalResidentTimeline {
|
||||
pub fn get_timeline_dir(&self) -> Utf8PathBuf {
|
||||
self.timeline_dir.clone()
|
||||
}
|
||||
|
||||
/// Update in memory remote consistent lsn.
|
||||
pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
shared_state.sk.state_mut().inmem.remote_consistent_lsn = max(
|
||||
shared_state.sk.state().inmem.remote_consistent_lsn,
|
||||
candidate,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// This struct contains methods that are used by timeline manager task.
|
||||
|
||||
@@ -47,11 +47,10 @@ pub(crate) struct StateSnapshot {
|
||||
// inmem values
|
||||
pub(crate) commit_lsn: Lsn,
|
||||
pub(crate) backup_lsn: Lsn,
|
||||
pub(crate) remote_consistent_lsn: Lsn,
|
||||
pub(crate) min_remote_consistent_lsn: Lsn,
|
||||
|
||||
// persistent control file values
|
||||
pub(crate) cfile_commit_lsn: Lsn,
|
||||
pub(crate) cfile_remote_consistent_lsn: Lsn,
|
||||
pub(crate) cfile_backup_lsn: Lsn,
|
||||
|
||||
// latest state
|
||||
@@ -60,7 +59,7 @@ pub(crate) struct StateSnapshot {
|
||||
|
||||
// misc
|
||||
pub(crate) cfile_last_persist_at: std::time::Instant,
|
||||
pub(crate) inmem_flush_pending: bool,
|
||||
pub(crate) cfile_inmem_flush_pending: bool,
|
||||
pub(crate) wal_removal_on_hold: bool,
|
||||
pub(crate) peers: Vec<PeerInfo>,
|
||||
}
|
||||
@@ -72,24 +71,23 @@ impl StateSnapshot {
|
||||
Self {
|
||||
commit_lsn: state.inmem.commit_lsn,
|
||||
backup_lsn: state.inmem.backup_lsn,
|
||||
remote_consistent_lsn: state.inmem.remote_consistent_lsn,
|
||||
min_remote_consistent_lsn: todo!(""),
|
||||
cfile_commit_lsn: state.commit_lsn,
|
||||
cfile_remote_consistent_lsn: state.remote_consistent_lsn,
|
||||
cfile_backup_lsn: state.backup_lsn,
|
||||
flush_lsn: read_guard.sk.flush_lsn(),
|
||||
last_log_term: read_guard.sk.last_log_term(),
|
||||
cfile_last_persist_at: state.pers.last_persist_at(),
|
||||
inmem_flush_pending: Self::has_unflushed_inmem_state(state),
|
||||
cfile_inmem_flush_pending: Self::has_unflushed_cfile_inmem_state(state),
|
||||
wal_removal_on_hold: read_guard.wal_removal_on_hold,
|
||||
peers: read_guard.get_peers(heartbeat_timeout),
|
||||
}
|
||||
}
|
||||
|
||||
fn has_unflushed_inmem_state(state: &TimelineState<FileStorage>) -> bool {
|
||||
fn has_unflushed_cfile_inmem_state(state: &TimelineState<FileStorage>) -> bool {
|
||||
state.inmem.commit_lsn > state.commit_lsn
|
||||
|| state.inmem.backup_lsn > state.backup_lsn
|
||||
|| state.inmem.peer_horizon_lsn > state.peer_horizon_lsn
|
||||
|| state.inmem.remote_consistent_lsn > state.remote_consistent_lsn
|
||||
// NB: remote_consistent_lsn storage is stored separately from control file
|
||||
}
|
||||
}
|
||||
|
||||
@@ -503,14 +501,14 @@ impl Manager {
|
||||
) {
|
||||
let is_active = is_wal_backup_required
|
||||
|| num_computes > 0
|
||||
|| state.remote_consistent_lsn < state.commit_lsn;
|
||||
|| state.min_remote_consistent_lsn < state.commit_lsn;
|
||||
|
||||
// update the broker timeline set
|
||||
if self.tli_broker_active.set(is_active) {
|
||||
// write log if state has changed
|
||||
info!(
|
||||
"timeline active={} now, remote_consistent_lsn={}, commit_lsn={}",
|
||||
is_active, state.remote_consistent_lsn, state.commit_lsn,
|
||||
"timeline active={} now, min_remote_consistent_lsn={}, commit_lsn={}",
|
||||
is_active, state.min_remote_consistent_lsn, state.commit_lsn,
|
||||
);
|
||||
|
||||
MANAGER_ACTIVE_CHANGES.inc();
|
||||
@@ -528,7 +526,7 @@ impl Manager {
|
||||
state: &StateSnapshot,
|
||||
next_event: &mut Option<Instant>,
|
||||
) {
|
||||
if !state.inmem_flush_pending {
|
||||
if !state.cfile_inmem_flush_pending {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -38,8 +38,6 @@ message SafekeeperTimelineInfo {
|
||||
uint64 commit_lsn = 5;
|
||||
// LSN up to which safekeeper has backed WAL.
|
||||
uint64 backup_lsn = 6;
|
||||
// LSN of last checkpoint uploaded by pageserver.
|
||||
uint64 remote_consistent_lsn = 7;
|
||||
uint64 peer_horizon_lsn = 8;
|
||||
uint64 local_start_lsn = 9;
|
||||
uint64 standby_horizon = 14;
|
||||
|
||||
@@ -760,7 +760,6 @@ mod tests {
|
||||
flush_lsn: 1,
|
||||
commit_lsn: 2,
|
||||
backup_lsn: 3,
|
||||
remote_consistent_lsn: 4,
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
|
||||
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
|
||||
|
||||
@@ -491,7 +491,6 @@ class NeonEnvBuilder:
|
||||
self.test_may_use_compatibility_snapshot_binaries = False
|
||||
self.version_combination = combination
|
||||
self.mixdir = self.test_output_dir / "mixdir_neon"
|
||||
|
||||
if self.version_combination is not None:
|
||||
assert (
|
||||
self.compatibility_neon_binpath is not None
|
||||
@@ -703,11 +702,6 @@ class NeonEnvBuilder:
|
||||
|
||||
def _mix_versions(self):
|
||||
assert self.version_combination is not None, "version combination must be set"
|
||||
|
||||
# Always use a newer version of `neon_local`
|
||||
(self.mixdir / "neon_local").symlink_to(self.neon_binpath / "neon_local")
|
||||
self.neon_local_binpath = self.mixdir
|
||||
|
||||
for component, paths in COMPONENT_BINARIES.items():
|
||||
directory = (
|
||||
self.neon_binpath
|
||||
@@ -717,10 +711,9 @@ class NeonEnvBuilder:
|
||||
for filename in paths:
|
||||
destination = self.mixdir / filename
|
||||
destination.symlink_to(directory / filename)
|
||||
self.neon_binpath = self.mixdir
|
||||
|
||||
if self.version_combination["compute"] == "old":
|
||||
self.pg_distrib_dir = self.compatibility_pg_distrib_dir
|
||||
self.neon_binpath = self.mixdir
|
||||
|
||||
def overlay_mount(self, ident: str, srcdir: Path, dstdir: Path):
|
||||
"""
|
||||
|
||||
@@ -34,7 +34,6 @@ class SafekeeperTimelineStatus:
|
||||
timeline_start_lsn: Lsn
|
||||
backup_lsn: Lsn
|
||||
peer_horizon_lsn: Lsn
|
||||
remote_consistent_lsn: Lsn
|
||||
walreceivers: list[Walreceiver]
|
||||
|
||||
|
||||
@@ -205,7 +204,6 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
timeline_start_lsn=Lsn(resj["timeline_start_lsn"]),
|
||||
backup_lsn=Lsn(resj["backup_lsn"]),
|
||||
peer_horizon_lsn=Lsn(resj["peer_horizon_lsn"]),
|
||||
remote_consistent_lsn=Lsn(resj["remote_consistent_lsn"]),
|
||||
walreceivers=walreceivers,
|
||||
)
|
||||
|
||||
|
||||
@@ -52,11 +52,11 @@ COMPONENT_BINARIES = {
|
||||
# Disable auto-formatting for better readability
|
||||
# fmt: off
|
||||
VERSIONS_COMBINATIONS = (
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "new", "safekeeper": "new", "pageserver": "new"}, # combination: nnnnn
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "old"}, # combination: ooonn
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "new"}, # combination: ononn
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "new", "pageserver": "new"}, # combination: onnnn
|
||||
{"storage_controller": "old", "storage_broker": "old", "compute": "new", "safekeeper": "new", "pageserver": "new"}, # combination: nnnoo
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "new", "safekeeper": "new", "pageserver": "new"},
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "old"},
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "new"},
|
||||
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "new", "pageserver": "new"},
|
||||
{"storage_controller": "old", "storage_broker": "old", "compute": "new", "safekeeper": "new", "pageserver": "new"},
|
||||
)
|
||||
# fmt: on
|
||||
|
||||
|
||||
Reference in New Issue
Block a user