mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-30 00:30:37 +00:00
Compare commits
15 Commits
release-69
...
remove-pos
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dc8ca6aaa1 | ||
|
|
af50fd76b7 | ||
|
|
da16233f64 | ||
|
|
80466bdca2 | ||
|
|
248558dee8 | ||
|
|
3bad52543f | ||
|
|
3d64a7ddcd | ||
|
|
25f1e5cfeb | ||
|
|
8dd555d396 | ||
|
|
01b6843e12 | ||
|
|
93987b5a4a | ||
|
|
33baca07b6 | ||
|
|
923974d4da | ||
|
|
e7277885b3 | ||
|
|
80262e724f |
6
.github/workflows/build_and_test.yml
vendored
6
.github/workflows/build_and_test.yml
vendored
@@ -839,6 +839,7 @@ jobs:
|
||||
- name: Build vm image
|
||||
run: |
|
||||
./vm-builder \
|
||||
-size=2G \
|
||||
-spec=compute/vm-image-spec-${{ matrix.version.debian }}.yaml \
|
||||
-src=neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }} \
|
||||
-dst=neondatabase/vm-compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}
|
||||
@@ -1116,7 +1117,10 @@ jobs:
|
||||
|
||||
gh workflow --repo neondatabase/infra run deploy-proxy-prod.yml --ref main \
|
||||
-f deployPgSniRouter=true \
|
||||
-f deployProxy=true \
|
||||
-f deployProxyLink=true \
|
||||
-f deployPrivatelinkProxy=true \
|
||||
-f deployProxyScram=true \
|
||||
-f deployProxyAuthBroker=true \
|
||||
-f branch=main \
|
||||
-f dockerTag=${{needs.tag.outputs.build-tag}}
|
||||
else
|
||||
|
||||
@@ -666,7 +666,7 @@ RUN apt-get update && \
|
||||
#
|
||||
# Use new version only for v17
|
||||
# because Release_2024_09_1 has some backward incompatible changes
|
||||
# https://github.com/rdkit/rdkit/releases/tag/Release_2024_09_1
|
||||
# https://github.com/rdkit/rdkit/releases/tag/Release_2024_09_1
|
||||
ENV PATH="/usr/local/pgsql/bin/:/usr/local/pgsql/:$PATH"
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v17") \
|
||||
@@ -860,13 +860,14 @@ ENV PATH="/home/nonroot/.cargo/bin:/usr/local/pgsql/bin/:$PATH"
|
||||
USER nonroot
|
||||
WORKDIR /home/nonroot
|
||||
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "v17 is not supported yet by pgrx. Quit" && exit 0;; \
|
||||
esac && \
|
||||
curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \
|
||||
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \
|
||||
chmod +x rustup-init && \
|
||||
./rustup-init -y --no-modify-path --profile minimal --default-toolchain stable && \
|
||||
rm rustup-init && \
|
||||
case "${PG_VERSION}" in \
|
||||
'v17') \
|
||||
echo 'v17 is not supported yet by pgrx. Quit' && exit 0;; \
|
||||
esac && \
|
||||
cargo install --locked --version 0.11.3 cargo-pgrx && \
|
||||
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
|
||||
|
||||
@@ -1041,6 +1042,31 @@ RUN wget https://github.com/pgpartman/pg_partman/archive/refs/tags/v5.1.0.tar.gz
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_partman.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg_mooncake"
|
||||
# compile pg_mooncake extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM rust-extensions-build AS pg-mooncake-build
|
||||
ARG PG_VERSION
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
ENV PG_MOONCAKE_VERSION=0a7de4c0b5c7b1a5e2175e1c5f4625b97b7346f1
|
||||
ENV PATH="/usr/local/pgsql/bin/:$PATH"
|
||||
|
||||
RUN case "${PG_VERSION}" in \
|
||||
'v14') \
|
||||
echo "pg_mooncake is not supported on Postgres ${PG_VERSION}" && exit 0;; \
|
||||
esac && \
|
||||
git clone --depth 1 --branch neon https://github.com/Mooncake-Labs/pg_mooncake.git pg_mooncake-src && \
|
||||
cd pg_mooncake-src && \
|
||||
git checkout "${PG_MOONCAKE_VERSION}" && \
|
||||
git submodule update --init --depth 1 --recursive && \
|
||||
make BUILD_TYPE=release -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make BUILD_TYPE=release -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_mooncake.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "neon-pg-ext-build"
|
||||
@@ -1084,6 +1110,7 @@ COPY --from=wal2json-pg-build /usr/local/pgsql /usr/local/pgsql
|
||||
COPY --from=pg-anon-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-ivm-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-partman-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-mooncake-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
|
||||
@@ -18,7 +18,7 @@ commands:
|
||||
- name: pgbouncer
|
||||
user: postgres
|
||||
sysvInitAction: respawn
|
||||
shell: '/usr/local/bin/pgbouncer /etc/pgbouncer.ini'
|
||||
shell: '/usr/local/bin/pgbouncer /etc/pgbouncer.ini 2>&1 > /dev/virtio-ports/tech.neon.log.0'
|
||||
- name: local_proxy
|
||||
user: postgres
|
||||
sysvInitAction: respawn
|
||||
|
||||
@@ -18,7 +18,7 @@ commands:
|
||||
- name: pgbouncer
|
||||
user: postgres
|
||||
sysvInitAction: respawn
|
||||
shell: '/usr/local/bin/pgbouncer /etc/pgbouncer.ini'
|
||||
shell: '/usr/local/bin/pgbouncer /etc/pgbouncer.ini 2>&1 > /dev/virtio-ports/tech.neon.log.0'
|
||||
- name: local_proxy
|
||||
user: postgres
|
||||
sysvInitAction: respawn
|
||||
|
||||
@@ -111,6 +111,11 @@ enum Command {
|
||||
#[arg(long)]
|
||||
node: NodeId,
|
||||
},
|
||||
/// Cancel any ongoing reconciliation for this shard
|
||||
TenantShardCancelReconcile {
|
||||
#[arg(long)]
|
||||
tenant_shard_id: TenantShardId,
|
||||
},
|
||||
/// Modify the pageserver tenant configuration of a tenant: this is the configuration structure
|
||||
/// that is passed through to pageservers, and does not affect storage controller behavior.
|
||||
TenantConfig {
|
||||
@@ -535,6 +540,15 @@ async fn main() -> anyhow::Result<()> {
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Command::TenantShardCancelReconcile { tenant_shard_id } => {
|
||||
storcon_client
|
||||
.dispatch::<(), ()>(
|
||||
Method::PUT,
|
||||
format!("control/v1/tenant/{tenant_shard_id}/cancel_reconcile"),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Command::TenantConfig { tenant_id, config } => {
|
||||
let tenant_conf = serde_json::from_str(&config)?;
|
||||
|
||||
|
||||
@@ -187,6 +187,8 @@ pub(super) async fn gather_inputs(
|
||||
// but it is unlikely to cause any issues. In the worst case,
|
||||
// the calculation will error out.
|
||||
timelines.retain(|t| t.is_active());
|
||||
// Also filter out archived timelines.
|
||||
timelines.retain(|t| t.is_archived() != Some(true));
|
||||
|
||||
// Build a map of branch points.
|
||||
let mut branchpoints: HashMap<TimelineId, HashSet<Lsn>> = HashMap::new();
|
||||
|
||||
10
poetry.lock
generated
10
poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aiohappyeyeballs"
|
||||
@@ -3118,13 +3118,13 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "werkzeug"
|
||||
version = "3.0.3"
|
||||
version = "3.0.6"
|
||||
description = "The comprehensive WSGI web application library."
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "werkzeug-3.0.3-py3-none-any.whl", hash = "sha256:fc9645dc43e03e4d630d23143a04a7f947a9a3b5727cd535fdfe155a17cc48c8"},
|
||||
{file = "werkzeug-3.0.3.tar.gz", hash = "sha256:097e5bfda9f0aba8da6b8545146def481d06aa7d3266e7448e2cccf67dd8bd18"},
|
||||
{file = "werkzeug-3.0.6-py3-none-any.whl", hash = "sha256:1bc0c2310d2fbb07b1dd1105eba2f7af72f322e1e455f2f93c993bee8c8a5f17"},
|
||||
{file = "werkzeug-3.0.6.tar.gz", hash = "sha256:a8dd59d4de28ca70471a34cba79bed5f7ef2e036a76b3ab0835474246eb41f8d"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@@ -3406,4 +3406,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.9"
|
||||
content-hash = "f52632571e34b0e51b059c280c35d6ff6f69f6a8c9586caca78282baf635be91"
|
||||
content-hash = "0f4804119f417edf8e1fbd6d715d2e8d70ad731334fa9570304a2203f83339cf"
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{info, warn};
|
||||
use tracing::{debug, info};
|
||||
|
||||
use super::{ComputeCredentials, ComputeUserInfo, ComputeUserInfoNoEndpoint};
|
||||
use crate::auth::{self, AuthFlow};
|
||||
@@ -21,7 +21,7 @@ pub(crate) async fn authenticate_cleartext(
|
||||
secret: AuthSecret,
|
||||
config: &'static AuthenticationConfig,
|
||||
) -> auth::Result<ComputeCredentials> {
|
||||
warn!("cleartext auth flow override is enabled, proceeding");
|
||||
debug!("cleartext auth flow override is enabled, proceeding");
|
||||
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
|
||||
|
||||
// pause the timer while we communicate with the client
|
||||
@@ -61,7 +61,7 @@ pub(crate) async fn password_hack_no_authentication(
|
||||
info: ComputeUserInfoNoEndpoint,
|
||||
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
|
||||
) -> auth::Result<(ComputeUserInfo, Vec<u8>)> {
|
||||
warn!("project not specified, resorting to the password hack auth flow");
|
||||
debug!("project not specified, resorting to the password hack auth flow");
|
||||
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
|
||||
|
||||
// pause the timer while we communicate with the client
|
||||
|
||||
@@ -137,9 +137,6 @@ struct ProxyCliArgs {
|
||||
/// size of the threadpool for password hashing
|
||||
#[clap(long, default_value_t = 4)]
|
||||
scram_thread_pool_size: u8,
|
||||
/// Disable dynamic rate limiter and store the metrics to ensure its production behaviour.
|
||||
#[clap(long, default_value_t = true, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
disable_dynamic_rate_limiter: bool,
|
||||
/// Endpoint rate limiter max number of requests per second.
|
||||
///
|
||||
/// Provided in the form `<Requests Per Second>@<Bucket Duration Size>`.
|
||||
@@ -615,9 +612,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
and metric-collection-interval must be specified"
|
||||
),
|
||||
};
|
||||
if !args.disable_dynamic_rate_limiter {
|
||||
bail!("dynamic rate limiter should be disabled");
|
||||
}
|
||||
|
||||
let config::ConcurrencyLockOptions {
|
||||
shards,
|
||||
|
||||
@@ -23,7 +23,7 @@ backoff = "^2.2.1"
|
||||
pytest-lazy-fixture = "^0.6.3"
|
||||
prometheus-client = "^0.14.1"
|
||||
pytest-timeout = "^2.1.0"
|
||||
Werkzeug = "^3.0.3"
|
||||
Werkzeug = "^3.0.6"
|
||||
pytest-order = "^1.1.0"
|
||||
allure-pytest = "^2.13.2"
|
||||
pytest-asyncio = "^0.21.0"
|
||||
|
||||
@@ -262,14 +262,6 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
|
||||
check_permission(&request, Some(ttid.tenant_id))?;
|
||||
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
// Note: with evicted timelines it should work better then de-evict them and
|
||||
// stream; probably start_snapshot would copy partial s3 file to dest path
|
||||
// and stream control file, or return WalResidentTimeline if timeline is not
|
||||
// evicted.
|
||||
let tli = tli
|
||||
.wal_residence_guard()
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
// To stream the body use wrap_stream which wants Stream of Result<Bytes>,
|
||||
// so create the chan and write to it in another task.
|
||||
|
||||
@@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
cmp::min,
|
||||
io::{self, ErrorKind},
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::{fs::OpenOptions, io::AsyncWrite, sync::mpsc, task};
|
||||
use tokio_tar::{Archive, Builder, Header};
|
||||
@@ -25,8 +26,8 @@ use crate::{
|
||||
routes::TimelineStatus,
|
||||
},
|
||||
safekeeper::Term,
|
||||
state::TimelinePersistentState,
|
||||
timeline::WalResidentTimeline,
|
||||
state::{EvictionState, TimelinePersistentState},
|
||||
timeline::{Timeline, WalResidentTimeline},
|
||||
timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},
|
||||
wal_backup,
|
||||
wal_storage::open_wal_file,
|
||||
@@ -43,18 +44,33 @@ use utils::{
|
||||
/// Stream tar archive of timeline to tx.
|
||||
#[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
|
||||
pub async fn stream_snapshot(
|
||||
tli: WalResidentTimeline,
|
||||
tli: Arc<Timeline>,
|
||||
source: NodeId,
|
||||
destination: NodeId,
|
||||
tx: mpsc::Sender<Result<Bytes>>,
|
||||
) {
|
||||
if let Err(e) = stream_snapshot_guts(tli, source, destination, tx.clone()).await {
|
||||
// Error type/contents don't matter as they won't can't reach the client
|
||||
// (hyper likely doesn't do anything with it), but http stream will be
|
||||
// prematurely terminated. It would be nice to try to send the error in
|
||||
// trailers though.
|
||||
tx.send(Err(anyhow!("snapshot failed"))).await.ok();
|
||||
error!("snapshot failed: {:#}", e);
|
||||
match tli.try_wal_residence_guard().await {
|
||||
Err(e) => {
|
||||
tx.send(Err(anyhow!("Error checking residence: {:#}", e)))
|
||||
.await
|
||||
.ok();
|
||||
}
|
||||
Ok(maybe_resident_tli) => {
|
||||
if let Err(e) = match maybe_resident_tli {
|
||||
Some(resident_tli) => {
|
||||
stream_snapshot_resident_guts(resident_tli, source, destination, tx.clone())
|
||||
.await
|
||||
}
|
||||
None => stream_snapshot_offloaded_guts(tli, source, destination, tx.clone()).await,
|
||||
} {
|
||||
// Error type/contents don't matter as they won't can't reach the client
|
||||
// (hyper likely doesn't do anything with it), but http stream will be
|
||||
// prematurely terminated. It would be nice to try to send the error in
|
||||
// trailers though.
|
||||
tx.send(Err(anyhow!("snapshot failed"))).await.ok();
|
||||
error!("snapshot failed: {:#}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,12 +96,10 @@ impl Drop for SnapshotContext {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn stream_snapshot_guts(
|
||||
tli: WalResidentTimeline,
|
||||
source: NodeId,
|
||||
destination: NodeId,
|
||||
/// Build a tokio_tar stream that sends encoded bytes into a Bytes channel.
|
||||
fn prepare_tar_stream(
|
||||
tx: mpsc::Sender<Result<Bytes>>,
|
||||
) -> Result<()> {
|
||||
) -> tokio_tar::Builder<impl AsyncWrite + Unpin + Send> {
|
||||
// tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
|
||||
// use SinkWriter as a Write impl. That is,
|
||||
// - create Sink from the tx. It returns PollSendError if chan is closed.
|
||||
@@ -100,12 +114,38 @@ pub async fn stream_snapshot_guts(
|
||||
// - SinkWriter (not surprisingly) wants sink of &[u8], not bytes, so wrap
|
||||
// into CopyToBytes. This is a data copy.
|
||||
let copy_to_bytes = CopyToBytes::new(oksink);
|
||||
let mut writer = SinkWriter::new(copy_to_bytes);
|
||||
let pinned_writer = std::pin::pin!(writer);
|
||||
let writer = SinkWriter::new(copy_to_bytes);
|
||||
let pinned_writer = Box::pin(writer);
|
||||
|
||||
// Note that tokio_tar append_* funcs use tokio::io::copy with 8KB buffer
|
||||
// which is also likely suboptimal.
|
||||
let mut ar = Builder::new_non_terminated(pinned_writer);
|
||||
Builder::new_non_terminated(pinned_writer)
|
||||
}
|
||||
|
||||
/// Implementation of snapshot for an offloaded timeline, only reads control file
|
||||
pub(crate) async fn stream_snapshot_offloaded_guts(
|
||||
tli: Arc<Timeline>,
|
||||
source: NodeId,
|
||||
destination: NodeId,
|
||||
tx: mpsc::Sender<Result<Bytes>>,
|
||||
) -> Result<()> {
|
||||
let mut ar = prepare_tar_stream(tx);
|
||||
|
||||
tli.snapshot_offloaded(&mut ar, source, destination).await?;
|
||||
|
||||
ar.finish().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Implementation of snapshot for a timeline which is resident (includes some segment data)
|
||||
pub async fn stream_snapshot_resident_guts(
|
||||
tli: WalResidentTimeline,
|
||||
source: NodeId,
|
||||
destination: NodeId,
|
||||
tx: mpsc::Sender<Result<Bytes>>,
|
||||
) -> Result<()> {
|
||||
let mut ar = prepare_tar_stream(tx);
|
||||
|
||||
let bctx = tli.start_snapshot(&mut ar, source, destination).await?;
|
||||
pausable_failpoint!("sk-snapshot-after-list-pausable");
|
||||
@@ -138,6 +178,70 @@ pub async fn stream_snapshot_guts(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
/// Simple snapshot for an offloaded timeline: we will only upload a renamed partial segment and
|
||||
/// pass a modified control file into the provided tar stream (nothing with data segments on disk, since
|
||||
/// we are offloaded and there aren't any)
|
||||
async fn snapshot_offloaded<W: AsyncWrite + Unpin + Send>(
|
||||
self: &Arc<Timeline>,
|
||||
ar: &mut tokio_tar::Builder<W>,
|
||||
source: NodeId,
|
||||
destination: NodeId,
|
||||
) -> Result<()> {
|
||||
// Take initial copy of control file, then release state lock
|
||||
let mut control_file = {
|
||||
let shared_state = self.write_shared_state().await;
|
||||
|
||||
let control_file = TimelinePersistentState::clone(shared_state.sk.state());
|
||||
|
||||
// Rare race: we got unevicted between entering function and reading control file.
|
||||
// We error out and let API caller retry.
|
||||
if !matches!(control_file.eviction_state, EvictionState::Offloaded(_)) {
|
||||
bail!("Timeline was un-evicted during snapshot, please retry");
|
||||
}
|
||||
|
||||
control_file
|
||||
};
|
||||
|
||||
// Modify the partial segment of the in-memory copy for the control file to
|
||||
// point to the destination safekeeper.
|
||||
let replace = control_file
|
||||
.partial_backup
|
||||
.replace_uploaded_segment(source, destination)?;
|
||||
|
||||
let Some(replace) = replace else {
|
||||
// In Manager:: ready_for_eviction, we do not permit eviction unless the timeline
|
||||
// has a partial segment. It is unexpected that
|
||||
anyhow::bail!("Timeline has no partial segment, cannot generate snapshot");
|
||||
};
|
||||
|
||||
tracing::info!("Replacing uploaded partial segment in in-mem control file: {replace:?}");
|
||||
|
||||
// Optimistically try to copy the partial segment to the destination's path: this
|
||||
// can fail if the timeline was un-evicted and modified in the background.
|
||||
let remote_timeline_path = &self.remote_path;
|
||||
wal_backup::copy_partial_segment(
|
||||
&replace.previous.remote_path(remote_timeline_path),
|
||||
&replace.current.remote_path(remote_timeline_path),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Since the S3 copy succeeded with the path given in our control file snapshot, and
|
||||
// we are sending that snapshot in our response, we are giving the caller a consistent
|
||||
// snapshot even if our local Timeline was unevicted or otherwise modified in the meantime.
|
||||
let buf = control_file
|
||||
.write_to_buf()
|
||||
.with_context(|| "failed to serialize control store")?;
|
||||
let mut header = Header::new_gnu();
|
||||
header.set_size(buf.len().try_into().expect("never breaches u64"));
|
||||
ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice())
|
||||
.await
|
||||
.with_context(|| "failed to append to archive")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl WalResidentTimeline {
|
||||
/// Start streaming tar archive with timeline:
|
||||
/// 1) stream control file under lock;
|
||||
|
||||
@@ -21,18 +21,15 @@ use postgres_backend::QueryError;
|
||||
use pq_proto::BeMessage;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::future;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::sync::mpsc::channel;
|
||||
use tokio::sync::mpsc::error::TryRecvError;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio::task;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
use tokio::time::{Duration, MissedTickBehavior};
|
||||
use tracing::*;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -444,9 +441,9 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
}
|
||||
}
|
||||
|
||||
// Send keepalive messages to walproposer, to make sure it receives updates
|
||||
// even when it writes a steady stream of messages.
|
||||
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
|
||||
/// The WAL flush interval. This ensures we periodically flush the WAL and send AppendResponses to
|
||||
/// walproposer, even when it's writing a steady stream of messages.
|
||||
const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
|
||||
|
||||
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
|
||||
/// replies to reply_tx.
|
||||
@@ -494,67 +491,76 @@ impl WalAcceptor {
|
||||
async fn run(&mut self) -> anyhow::Result<()> {
|
||||
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
|
||||
|
||||
// After this timestamp we will stop processing AppendRequests and send a response
|
||||
// to the walproposer. walproposer sends at least one AppendRequest per second,
|
||||
// we will send keepalives by replying to these requests once per second.
|
||||
let mut next_keepalive = Instant::now();
|
||||
// Periodically flush the WAL.
|
||||
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
|
||||
flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||
flush_ticker.tick().await; // skip the initial, immediate tick
|
||||
|
||||
while let Some(mut next_msg) = self.msg_rx.recv().await {
|
||||
// Update walreceiver state in shmem for reporting.
|
||||
if let ProposerAcceptorMessage::Elected(_) = &next_msg {
|
||||
walreceiver_guard.get().status = WalReceiverStatus::Streaming;
|
||||
}
|
||||
// Tracks unflushed appends.
|
||||
let mut dirty = false;
|
||||
|
||||
let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
|
||||
// Loop through AppendRequests while available to write as many WAL records as
|
||||
// possible without fsyncing.
|
||||
//
|
||||
// Make sure the WAL is flushed before returning, see:
|
||||
// https://github.com/neondatabase/neon/issues/9259
|
||||
//
|
||||
// Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
|
||||
// Otherwise, we might end up in a situation where we read a message, but don't
|
||||
// process it.
|
||||
while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
|
||||
let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
|
||||
|
||||
if let Some(reply) = self.tli.process_msg(&noflush_msg).await? {
|
||||
if self.reply_tx.send(reply).await.is_err() {
|
||||
break; // disconnected, flush WAL and return on next send/recv
|
||||
}
|
||||
}
|
||||
|
||||
// get out of this loop if keepalive time is reached
|
||||
if Instant::now() >= next_keepalive {
|
||||
loop {
|
||||
let reply = tokio::select! {
|
||||
// Process inbound message.
|
||||
msg = self.msg_rx.recv() => {
|
||||
// If disconnected, break to flush WAL and return.
|
||||
let Some(mut msg) = msg else {
|
||||
break;
|
||||
};
|
||||
|
||||
// Update walreceiver state in shmem for reporting.
|
||||
if let ProposerAcceptorMessage::Elected(_) = &msg {
|
||||
walreceiver_guard.get().status = WalReceiverStatus::Streaming;
|
||||
}
|
||||
|
||||
// continue pulling AppendRequests if available
|
||||
match self.msg_rx.try_recv() {
|
||||
Ok(msg) => next_msg = msg,
|
||||
Err(TryRecvError::Empty) => break,
|
||||
// on disconnect, flush WAL and return on next send/recv
|
||||
Err(TryRecvError::Disconnected) => break,
|
||||
};
|
||||
// Don't flush the WAL on every append, only periodically via flush_ticker.
|
||||
// This batches multiple appends per fsync. If the channel is empty after
|
||||
// sending the reply, we'll schedule an immediate flush.
|
||||
if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
|
||||
msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
|
||||
dirty = true;
|
||||
}
|
||||
|
||||
self.tli.process_msg(&msg).await?
|
||||
}
|
||||
|
||||
// flush all written WAL to the disk
|
||||
self.tli
|
||||
.process_msg(&ProposerAcceptorMessage::FlushWAL)
|
||||
.await?
|
||||
} else {
|
||||
// process message other than AppendRequest
|
||||
self.tli.process_msg(&next_msg).await?
|
||||
// While receiving AppendRequests, flush the WAL periodically and respond with an
|
||||
// AppendResponse to let walproposer know we're still alive.
|
||||
_ = flush_ticker.tick(), if dirty => {
|
||||
dirty = false;
|
||||
self.tli
|
||||
.process_msg(&ProposerAcceptorMessage::FlushWAL)
|
||||
.await?
|
||||
}
|
||||
|
||||
// If there are no pending messages, flush the WAL immediately.
|
||||
//
|
||||
// TODO: this should be done via flush_ticker.reset_immediately(), but that's always
|
||||
// delayed by 1ms due to this bug: https://github.com/tokio-rs/tokio/issues/6866.
|
||||
_ = future::ready(()), if dirty && self.msg_rx.is_empty() => {
|
||||
dirty = false;
|
||||
flush_ticker.reset();
|
||||
self.tli
|
||||
.process_msg(&ProposerAcceptorMessage::FlushWAL)
|
||||
.await?
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(reply) = reply_msg {
|
||||
// Send reply, if any.
|
||||
if let Some(reply) = reply {
|
||||
if self.reply_tx.send(reply).await.is_err() {
|
||||
return Ok(()); // chan closed, streaming terminated
|
||||
break; // disconnected, break to flush WAL and return
|
||||
}
|
||||
// reset keepalive time
|
||||
next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
|
||||
}
|
||||
}
|
||||
|
||||
// Flush WAL on disconnect, see https://github.com/neondatabase/neon/issues/9259.
|
||||
if dirty {
|
||||
self.tli
|
||||
.process_msg(&ProposerAcceptorMessage::FlushWAL)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -797,14 +797,17 @@ impl Timeline {
|
||||
state.sk.term_bump(to).await
|
||||
}
|
||||
|
||||
/// Get the timeline guard for reading/writing WAL files.
|
||||
/// If WAL files are not present on disk (evicted), they will be automatically
|
||||
/// downloaded from remote storage. This is done in the manager task, which is
|
||||
/// responsible for issuing all guards.
|
||||
///
|
||||
/// NB: don't use this function from timeline_manager, it will deadlock.
|
||||
/// NB: don't use this function while holding shared_state lock.
|
||||
pub async fn wal_residence_guard(self: &Arc<Self>) -> Result<WalResidentTimeline> {
|
||||
/// Guts of [`Self::wal_residence_guard`] and [`Self::try_wal_residence_guard`]
|
||||
async fn do_wal_residence_guard(
|
||||
self: &Arc<Self>,
|
||||
block: bool,
|
||||
) -> Result<Option<WalResidentTimeline>> {
|
||||
let op_label = if block {
|
||||
"wal_residence_guard"
|
||||
} else {
|
||||
"try_wal_residence_guard"
|
||||
};
|
||||
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
@@ -816,10 +819,13 @@ impl Timeline {
|
||||
// Wait 30 seconds for the guard to be acquired. It can time out if someone is
|
||||
// holding the lock (e.g. during `SafeKeeper::process_msg()`) or manager task
|
||||
// is stuck.
|
||||
let res = tokio::time::timeout_at(
|
||||
started_at + Duration::from_secs(30),
|
||||
self.manager_ctl.wal_residence_guard(),
|
||||
)
|
||||
let res = tokio::time::timeout_at(started_at + Duration::from_secs(30), async {
|
||||
if block {
|
||||
self.manager_ctl.wal_residence_guard().await.map(Some)
|
||||
} else {
|
||||
self.manager_ctl.try_wal_residence_guard().await
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
let guard = match res {
|
||||
@@ -827,14 +833,14 @@ impl Timeline {
|
||||
let finished_at = Instant::now();
|
||||
let elapsed = finished_at - started_at;
|
||||
MISC_OPERATION_SECONDS
|
||||
.with_label_values(&["wal_residence_guard"])
|
||||
.with_label_values(&[op_label])
|
||||
.observe(elapsed.as_secs_f64());
|
||||
|
||||
guard
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
warn!(
|
||||
"error while acquiring WalResidentTimeline guard, statuses {:?} => {:?}",
|
||||
"error acquiring in {op_label}, statuses {:?} => {:?}",
|
||||
status_before,
|
||||
self.mgr_status.get()
|
||||
);
|
||||
@@ -842,7 +848,7 @@ impl Timeline {
|
||||
}
|
||||
Err(_) => {
|
||||
warn!(
|
||||
"timeout while acquiring WalResidentTimeline guard, statuses {:?} => {:?}",
|
||||
"timeout acquiring in {op_label} guard, statuses {:?} => {:?}",
|
||||
status_before,
|
||||
self.mgr_status.get()
|
||||
);
|
||||
@@ -850,7 +856,28 @@ impl Timeline {
|
||||
}
|
||||
};
|
||||
|
||||
Ok(WalResidentTimeline::new(self.clone(), guard))
|
||||
Ok(guard.map(|g| WalResidentTimeline::new(self.clone(), g)))
|
||||
}
|
||||
|
||||
/// Get the timeline guard for reading/writing WAL files.
|
||||
/// If WAL files are not present on disk (evicted), they will be automatically
|
||||
/// downloaded from remote storage. This is done in the manager task, which is
|
||||
/// responsible for issuing all guards.
|
||||
///
|
||||
/// NB: don't use this function from timeline_manager, it will deadlock.
|
||||
/// NB: don't use this function while holding shared_state lock.
|
||||
pub async fn wal_residence_guard(self: &Arc<Self>) -> Result<WalResidentTimeline> {
|
||||
self.do_wal_residence_guard(true)
|
||||
.await
|
||||
.map(|m| m.expect("Always get Some in block=true mode"))
|
||||
}
|
||||
|
||||
/// Get the timeline guard for reading/writing WAL files if the timeline is resident,
|
||||
/// else return None
|
||||
pub(crate) async fn try_wal_residence_guard(
|
||||
self: &Arc<Self>,
|
||||
) -> Result<Option<WalResidentTimeline>> {
|
||||
self.do_wal_residence_guard(false).await
|
||||
}
|
||||
|
||||
pub async fn backup_partial_reset(self: &Arc<Self>) -> Result<Vec<String>> {
|
||||
|
||||
@@ -56,6 +56,9 @@ impl Manager {
|
||||
// This also works for the first segment despite last_removed_segno
|
||||
// being 0 on init because this 0 triggers run of wal_removal_task
|
||||
// on success of which manager updates the horizon.
|
||||
//
|
||||
// **Note** pull_timeline functionality assumes that evicted timelines always have
|
||||
// a partial segment: if we ever change this condition, must also update that code.
|
||||
&& self
|
||||
.partial_backup_uploaded
|
||||
.as_ref()
|
||||
|
||||
@@ -100,6 +100,8 @@ const REFRESH_INTERVAL: Duration = Duration::from_millis(300);
|
||||
pub enum ManagerCtlMessage {
|
||||
/// Request to get a guard for WalResidentTimeline, with WAL files available locally.
|
||||
GuardRequest(tokio::sync::oneshot::Sender<anyhow::Result<ResidenceGuard>>),
|
||||
/// Get a guard for WalResidentTimeline if the timeline is not currently offloaded, else None
|
||||
TryGuardRequest(tokio::sync::oneshot::Sender<Option<ResidenceGuard>>),
|
||||
/// Request to drop the guard.
|
||||
GuardDrop(GuardId),
|
||||
/// Request to reset uploaded partial backup state.
|
||||
@@ -110,6 +112,7 @@ impl std::fmt::Debug for ManagerCtlMessage {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
ManagerCtlMessage::GuardRequest(_) => write!(f, "GuardRequest"),
|
||||
ManagerCtlMessage::TryGuardRequest(_) => write!(f, "TryGuardRequest"),
|
||||
ManagerCtlMessage::GuardDrop(id) => write!(f, "GuardDrop({:?})", id),
|
||||
ManagerCtlMessage::BackupPartialReset(_) => write!(f, "BackupPartialReset"),
|
||||
}
|
||||
@@ -152,6 +155,19 @@ impl ManagerCtl {
|
||||
.and_then(std::convert::identity)
|
||||
}
|
||||
|
||||
/// Issue a new guard if the timeline is currently not offloaded, else return None
|
||||
/// Sends a message to the manager and waits for the response.
|
||||
/// Can be blocked indefinitely if the manager is stuck.
|
||||
pub async fn try_wal_residence_guard(&self) -> anyhow::Result<Option<ResidenceGuard>> {
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
self.manager_tx
|
||||
.send(ManagerCtlMessage::TryGuardRequest(tx))?;
|
||||
|
||||
// wait for the manager to respond with the guard
|
||||
rx.await
|
||||
.map_err(|e| anyhow::anyhow!("response read fail: {:?}", e))
|
||||
}
|
||||
|
||||
/// Request timeline manager to reset uploaded partial segment state and
|
||||
/// wait for the result.
|
||||
pub async fn backup_partial_reset(&self) -> anyhow::Result<Vec<String>> {
|
||||
@@ -674,6 +690,17 @@ impl Manager {
|
||||
warn!("failed to reply with a guard, receiver dropped");
|
||||
}
|
||||
}
|
||||
Some(ManagerCtlMessage::TryGuardRequest(tx)) => {
|
||||
let result = if self.is_offloaded {
|
||||
None
|
||||
} else {
|
||||
Some(self.access_service.create_guard())
|
||||
};
|
||||
|
||||
if tx.send(result).is_err() {
|
||||
warn!("failed to reply with a guard, receiver dropped");
|
||||
}
|
||||
}
|
||||
Some(ManagerCtlMessage::GuardDrop(guard_id)) => {
|
||||
self.access_service.drop_guard(guard_id);
|
||||
}
|
||||
|
||||
@@ -968,6 +968,28 @@ async fn handle_tenant_shard_migrate(
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_tenant_shard_cancel_reconcile(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
return res;
|
||||
}
|
||||
ForwardOutcome::NotForwarded(req) => req,
|
||||
};
|
||||
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
service
|
||||
.tenant_shard_cancel_reconcile(tenant_shard_id)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_tenant_update_policy(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
@@ -1776,6 +1798,16 @@ pub fn make_router(
|
||||
RequestName("control_v1_tenant_migrate"),
|
||||
)
|
||||
})
|
||||
.put(
|
||||
"/control/v1/tenant/:tenant_shard_id/cancel_reconcile",
|
||||
|r| {
|
||||
tenant_service_handler(
|
||||
r,
|
||||
handle_tenant_shard_cancel_reconcile,
|
||||
RequestName("control_v1_tenant_cancel_reconcile"),
|
||||
)
|
||||
},
|
||||
)
|
||||
.put("/control/v1/tenant/:tenant_id/shard_split", |r| {
|
||||
tenant_service_handler(
|
||||
r,
|
||||
|
||||
@@ -4834,6 +4834,43 @@ impl Service {
|
||||
Ok(TenantShardMigrateResponse {})
|
||||
}
|
||||
|
||||
/// 'cancel' in this context means cancel any ongoing reconcile
|
||||
pub(crate) async fn tenant_shard_cancel_reconcile(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> Result<(), ApiError> {
|
||||
// Take state lock and fire the cancellation token, after which we drop lock and wait for any ongoing reconcile to complete
|
||||
let waiter = {
|
||||
let locked = self.inner.write().unwrap();
|
||||
let Some(shard) = locked.tenants.get(&tenant_shard_id) else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant shard not found").into(),
|
||||
));
|
||||
};
|
||||
|
||||
let waiter = shard.get_waiter();
|
||||
match waiter {
|
||||
None => {
|
||||
tracing::info!("Shard does not have an ongoing Reconciler");
|
||||
return Ok(());
|
||||
}
|
||||
Some(waiter) => {
|
||||
tracing::info!("Cancelling Reconciler");
|
||||
shard.cancel_reconciler();
|
||||
waiter
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Cancellation should be prompt. If this fails we have still done our job of firing the
|
||||
// cancellation token, but by returning an ApiError we will indicate to the caller that
|
||||
// the Reconciler is misbehaving and not respecting the cancellation token
|
||||
self.await_waiters(vec![waiter], SHORT_RECONCILE_TIMEOUT)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This is for debug/support only: we simply drop all state for a tenant, without
|
||||
/// detaching or deleting it on pageservers.
|
||||
pub(crate) async fn tenant_drop(&self, tenant_id: TenantId) -> Result<(), ApiError> {
|
||||
|
||||
@@ -1317,6 +1317,12 @@ impl TenantShard {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn cancel_reconciler(&self) {
|
||||
if let Some(handle) = self.reconciler.as_ref() {
|
||||
handle.cancel.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a waiter for any reconciliation in flight, but do not start reconciliation
|
||||
/// if it is not already running
|
||||
pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
|
||||
|
||||
@@ -40,6 +40,7 @@ from _pytest.fixtures import FixtureRequest
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
from psycopg2.extensions import cursor as PgCursor
|
||||
from psycopg2.extensions import make_dsn, parse_dsn
|
||||
from pytest_httpserver import HTTPServer
|
||||
from urllib3.util.retry import Retry
|
||||
|
||||
from fixtures import overlayfs
|
||||
@@ -3098,10 +3099,6 @@ class NeonProxy(PgProtocol):
|
||||
class AuthBackend(abc.ABC):
|
||||
"""All auth backends must inherit from this class"""
|
||||
|
||||
@property
|
||||
def default_conn_url(self) -> Optional[str]:
|
||||
return None
|
||||
|
||||
@abc.abstractmethod
|
||||
def extra_args(self) -> list[str]:
|
||||
pass
|
||||
@@ -3115,7 +3112,7 @@ class NeonProxy(PgProtocol):
|
||||
*["--allow-self-signed-compute", "true"],
|
||||
]
|
||||
|
||||
class Console(AuthBackend):
|
||||
class ControlPlane(AuthBackend):
|
||||
def __init__(self, endpoint: str, fixed_rate_limit: Optional[int] = None):
|
||||
self.endpoint = endpoint
|
||||
self.fixed_rate_limit = fixed_rate_limit
|
||||
@@ -3139,21 +3136,6 @@ class NeonProxy(PgProtocol):
|
||||
]
|
||||
return args
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Postgres(AuthBackend):
|
||||
pg_conn_url: str
|
||||
|
||||
@property
|
||||
def default_conn_url(self) -> Optional[str]:
|
||||
return self.pg_conn_url
|
||||
|
||||
def extra_args(self) -> list[str]:
|
||||
return [
|
||||
# Postgres auth backend params
|
||||
*["--auth-backend", "postgres"],
|
||||
*["--auth-endpoint", self.pg_conn_url],
|
||||
]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
neon_binpath: Path,
|
||||
@@ -3168,7 +3150,7 @@ class NeonProxy(PgProtocol):
|
||||
):
|
||||
host = "127.0.0.1"
|
||||
domain = "proxy.localtest.me" # resolves to 127.0.0.1
|
||||
super().__init__(dsn=auth_backend.default_conn_url, host=domain, port=proxy_port)
|
||||
super().__init__(host=domain, port=proxy_port)
|
||||
|
||||
self.domain = domain
|
||||
self.host = host
|
||||
@@ -3422,20 +3404,39 @@ def static_proxy(
|
||||
port_distributor: PortDistributor,
|
||||
neon_binpath: Path,
|
||||
test_output_dir: Path,
|
||||
httpserver: HTTPServer,
|
||||
) -> Iterator[NeonProxy]:
|
||||
"""Neon proxy that routes directly to vanilla postgres."""
|
||||
"""Neon proxy that routes directly to vanilla postgres and a mocked cplane HTTP API."""
|
||||
|
||||
port = vanilla_pg.default_options["port"]
|
||||
host = vanilla_pg.default_options["host"]
|
||||
dbname = vanilla_pg.default_options["dbname"]
|
||||
auth_endpoint = f"postgres://proxy:password@{host}:{port}/{dbname}"
|
||||
|
||||
# For simplicity, we use the same user for both `--auth-endpoint` and `safe_psql`
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create user proxy with login superuser password 'password'")
|
||||
vanilla_pg.safe_psql("CREATE SCHEMA IF NOT EXISTS neon_control_plane")
|
||||
vanilla_pg.safe_psql(
|
||||
"CREATE TABLE neon_control_plane.endpoints (endpoint_id VARCHAR(255) PRIMARY KEY, allowed_ips VARCHAR(255))"
|
||||
[(rolpassword,)] = vanilla_pg.safe_psql(
|
||||
"select rolpassword from pg_catalog.pg_authid where rolname = 'proxy'"
|
||||
)
|
||||
|
||||
# return local postgres addr on ProxyWakeCompute.
|
||||
httpserver.expect_request("/cplane/proxy_wake_compute").respond_with_json(
|
||||
{
|
||||
"address": f"{host}:{port}",
|
||||
"aux": {
|
||||
"endpoint_id": "ep-foo-bar-1234",
|
||||
"branch_id": "br-foo-bar",
|
||||
"project_id": "foo-bar",
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
# return local postgres addr on ProxyWakeCompute.
|
||||
httpserver.expect_request("/cplane/proxy_get_role_secret").respond_with_json(
|
||||
{
|
||||
"role_secret": rolpassword,
|
||||
"allowed_ips": None,
|
||||
"project_id": "foo-bar",
|
||||
}
|
||||
)
|
||||
|
||||
proxy_port = port_distributor.get_port()
|
||||
@@ -3450,8 +3451,12 @@ def static_proxy(
|
||||
http_port=http_port,
|
||||
mgmt_port=mgmt_port,
|
||||
external_http_port=external_http_port,
|
||||
auth_backend=NeonProxy.Postgres(auth_endpoint),
|
||||
auth_backend=NeonProxy.ControlPlane(httpserver.url_for("/cplane")),
|
||||
) as proxy:
|
||||
proxy.default_options["user"] = "proxy"
|
||||
proxy.default_options["password"] = "password"
|
||||
proxy.default_options["dbname"] = dbname
|
||||
|
||||
proxy.start()
|
||||
yield proxy
|
||||
|
||||
|
||||
@@ -6,20 +6,27 @@ from fixtures.neon_fixtures import (
|
||||
NeonProxy,
|
||||
VanillaPostgres,
|
||||
)
|
||||
from pytest_httpserver import HTTPServer
|
||||
|
||||
TABLE_NAME = "neon_control_plane.endpoints"
|
||||
|
||||
|
||||
# Proxy uses the same logic for psql and websockets.
|
||||
@pytest.mark.asyncio
|
||||
async def test_proxy_psql_allowed_ips(static_proxy: NeonProxy, vanilla_pg: VanillaPostgres):
|
||||
# Shouldn't be able to connect to this project
|
||||
vanilla_pg.safe_psql(
|
||||
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('private-project', '8.8.8.8')"
|
||||
def test_proxy_psql_not_allowed_ips(
|
||||
static_proxy: NeonProxy,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
httpserver: HTTPServer,
|
||||
):
|
||||
[(rolpassword,)] = vanilla_pg.safe_psql(
|
||||
"select rolpassword from pg_catalog.pg_authid where rolname = 'proxy'"
|
||||
)
|
||||
# Should be able to connect to this project
|
||||
vanilla_pg.safe_psql(
|
||||
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('generic-project', '::1,127.0.0.1')"
|
||||
|
||||
# Shouldn't be able to connect to this project
|
||||
httpserver.expect_request("/cplane/proxy_get_role_secret").respond_with_json(
|
||||
{
|
||||
"role_secret": rolpassword,
|
||||
"allowed_ips": ["8.8.8.8"],
|
||||
"project_id": "foo-bar",
|
||||
}
|
||||
)
|
||||
|
||||
def check_cannot_connect(**kwargs):
|
||||
@@ -37,6 +44,25 @@ async def test_proxy_psql_allowed_ips(static_proxy: NeonProxy, vanilla_pg: Vanil
|
||||
# with SNI
|
||||
check_cannot_connect(query="select 1", host="private-project.localtest.me")
|
||||
|
||||
|
||||
def test_proxy_psql_allowed_ips(
|
||||
static_proxy: NeonProxy,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
httpserver: HTTPServer,
|
||||
):
|
||||
[(rolpassword,)] = vanilla_pg.safe_psql(
|
||||
"select rolpassword from pg_catalog.pg_authid where rolname = 'proxy'"
|
||||
)
|
||||
|
||||
# Should be able to connect to this project
|
||||
httpserver.expect_request("/cplane/proxy_get_role_secret").respond_with_json(
|
||||
{
|
||||
"role_secret": rolpassword,
|
||||
"allowed_ips": ["::1", "127.0.0.1"],
|
||||
"project_id": "foo-bar",
|
||||
}
|
||||
)
|
||||
|
||||
# no SNI, deprecated `options=project` syntax (before we had several endpoint in project)
|
||||
out = static_proxy.safe_psql(query="select 1", sslsni=0, options="project=generic-project")
|
||||
assert out[0][0] == 1
|
||||
@@ -50,27 +76,61 @@ async def test_proxy_psql_allowed_ips(static_proxy: NeonProxy, vanilla_pg: Vanil
|
||||
assert out[0][0] == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_proxy_http_allowed_ips(static_proxy: NeonProxy, vanilla_pg: VanillaPostgres):
|
||||
static_proxy.safe_psql("create user http_auth with password 'http' superuser")
|
||||
def test_proxy_http_not_allowed_ips(
|
||||
static_proxy: NeonProxy,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
httpserver: HTTPServer,
|
||||
):
|
||||
vanilla_pg.safe_psql("create user http_auth with password 'http' superuser")
|
||||
|
||||
# Shouldn't be able to connect to this project
|
||||
vanilla_pg.safe_psql(
|
||||
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('proxy', '8.8.8.8')"
|
||||
[(rolpassword,)] = vanilla_pg.safe_psql(
|
||||
"select rolpassword from pg_catalog.pg_authid where rolname = 'http_auth'"
|
||||
)
|
||||
|
||||
def query(status: int, query: str, *args):
|
||||
httpserver.expect_oneshot_request("/cplane/proxy_get_role_secret").respond_with_json(
|
||||
{
|
||||
"role_secret": rolpassword,
|
||||
"allowed_ips": ["8.8.8.8"],
|
||||
"project_id": "foo-bar",
|
||||
}
|
||||
)
|
||||
|
||||
with httpserver.wait() as waiting:
|
||||
static_proxy.http_query(
|
||||
query,
|
||||
args,
|
||||
"select 1;",
|
||||
[],
|
||||
user="http_auth",
|
||||
password="http",
|
||||
expected_code=status,
|
||||
expected_code=400,
|
||||
)
|
||||
assert waiting.result
|
||||
|
||||
query(400, "select 1;") # ip address is not allowed
|
||||
# Should be able to connect to this project
|
||||
vanilla_pg.safe_psql(
|
||||
f"UPDATE {TABLE_NAME} SET allowed_ips = '8.8.8.8,127.0.0.1' WHERE endpoint_id = 'proxy'"
|
||||
|
||||
def test_proxy_http_allowed_ips(
|
||||
static_proxy: NeonProxy,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
httpserver: HTTPServer,
|
||||
):
|
||||
vanilla_pg.safe_psql("create user http_auth with password 'http' superuser")
|
||||
|
||||
[(rolpassword,)] = vanilla_pg.safe_psql(
|
||||
"select rolpassword from pg_catalog.pg_authid where rolname = 'http_auth'"
|
||||
)
|
||||
query(200, "select 1;") # should work now
|
||||
|
||||
httpserver.expect_oneshot_request("/cplane/proxy_get_role_secret").respond_with_json(
|
||||
{
|
||||
"role_secret": rolpassword,
|
||||
"allowed_ips": ["8.8.8.8", "127.0.0.1"],
|
||||
"project_id": "foo-bar",
|
||||
}
|
||||
)
|
||||
|
||||
with httpserver.wait() as waiting:
|
||||
static_proxy.http_query(
|
||||
"select 1;",
|
||||
[],
|
||||
user="http_auth",
|
||||
password="http",
|
||||
expected_code=200,
|
||||
)
|
||||
assert waiting.result
|
||||
|
||||
@@ -18,6 +18,7 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
NeonPageserver,
|
||||
PageserverAvailability,
|
||||
PageserverSchedulingPolicy,
|
||||
PgBin,
|
||||
@@ -298,17 +299,20 @@ def test_storage_controller_restart(neon_env_builder: NeonEnvBuilder):
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("warm_up", [True, False])
|
||||
def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up: bool):
|
||||
def prepare_onboarding_env(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
) -> tuple[NeonEnv, NeonPageserver, TenantId, int]:
|
||||
"""
|
||||
We onboard tenants to the sharding service by treating it as a 'virtual pageserver'
|
||||
which provides the /location_config API. This is similar to creating a tenant,
|
||||
but imports the generation number.
|
||||
For tests that do onboarding of a tenant to the storage controller, a small dance to
|
||||
set up one pageserver that won't be managed by the storage controller and create
|
||||
a tenant there.
|
||||
"""
|
||||
|
||||
# One pageserver to simulate legacy environment, two to be managed by storage controller
|
||||
neon_env_builder.num_pageservers = 3
|
||||
|
||||
# Enable tests to use methods that require real S3 API
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
|
||||
# Start services by hand so that we can skip registration on one of the pageservers
|
||||
env = neon_env_builder.init_configs()
|
||||
env.broker.start()
|
||||
@@ -329,7 +333,6 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
|
||||
# will be attached after onboarding
|
||||
env.pageservers[1].start()
|
||||
env.pageservers[2].start()
|
||||
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||
|
||||
for sk in env.safekeepers:
|
||||
sk.start()
|
||||
@@ -339,6 +342,23 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
|
||||
generation = 123
|
||||
origin_ps.tenant_create(tenant_id, generation=generation)
|
||||
|
||||
origin_ps.http_client().timeline_create(PgVersion.NOT_SET, tenant_id, TimelineId.generate())
|
||||
|
||||
return (env, origin_ps, tenant_id, generation)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("warm_up", [True, False])
|
||||
def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up: bool):
|
||||
"""
|
||||
We onboard tenants to the sharding service by treating it as a 'virtual pageserver'
|
||||
which provides the /location_config API. This is similar to creating a tenant,
|
||||
but imports the generation number.
|
||||
"""
|
||||
|
||||
env, origin_ps, tenant_id, generation = prepare_onboarding_env(neon_env_builder)
|
||||
|
||||
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||
|
||||
# As if doing a live migration, first configure origin into stale mode
|
||||
r = origin_ps.http_client().tenant_location_conf(
|
||||
tenant_id,
|
||||
@@ -475,6 +495,70 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
@run_only_on_default_postgres("this test doesn't start an endpoint")
|
||||
def test_storage_controller_onboard_detached(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Sometimes, the control plane wants to delete a tenant that wasn't attached to any pageserver,
|
||||
and also wasn't ever registered with the storage controller.
|
||||
|
||||
It may do this by calling /location_conf in mode Detached and then calling the delete API
|
||||
as normal.
|
||||
"""
|
||||
|
||||
env, origin_ps, tenant_id, generation = prepare_onboarding_env(neon_env_builder)
|
||||
|
||||
remote_prefix = "/".join(
|
||||
(
|
||||
"tenants",
|
||||
str(tenant_id),
|
||||
)
|
||||
)
|
||||
|
||||
# Detach it from its original pageserver.
|
||||
origin_ps.http_client().tenant_location_conf(
|
||||
tenant_id,
|
||||
{
|
||||
"mode": "Detached",
|
||||
"secondary_conf": None,
|
||||
"tenant_conf": {},
|
||||
"generation": None,
|
||||
},
|
||||
)
|
||||
|
||||
# Since we will later assert that remote data is gone, as a control also check it was ever there
|
||||
assert_prefix_not_empty(
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix=remote_prefix,
|
||||
)
|
||||
|
||||
# Register with storage controller in Detached state
|
||||
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||
generation += 1
|
||||
r = virtual_ps_http.tenant_location_conf(
|
||||
tenant_id,
|
||||
{
|
||||
"mode": "Detached",
|
||||
"secondary_conf": None,
|
||||
"tenant_conf": {},
|
||||
"generation": generation,
|
||||
},
|
||||
)
|
||||
assert len(r["shards"]) == 0 # location_conf tells us there are no attached shards
|
||||
|
||||
# Onboarding in Detached state shouldn't have attached it to any pageserver
|
||||
for ps in env.pageservers:
|
||||
assert ps.http_client().tenant_list() == []
|
||||
|
||||
# Delete it via the storage controller
|
||||
virtual_ps_http.tenant_delete(tenant_id)
|
||||
|
||||
# Check that we really deleted it
|
||||
assert_prefix_empty(
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix=remote_prefix,
|
||||
)
|
||||
|
||||
|
||||
def test_storage_controller_compute_hook(
|
||||
httpserver: HTTPServer,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
@@ -872,6 +956,14 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
|
||||
assert sum(v["shard_count"] for v in response.json()["nodes"].values()) == 3
|
||||
assert all(v["may_schedule"] for v in response.json()["nodes"].values())
|
||||
|
||||
# Reconciler cancel API should be a no-op when nothing is in flight
|
||||
env.storage_controller.request(
|
||||
"PUT",
|
||||
f"{env.storage_controller_api}/control/v1/tenant/{tenant_id}-0102/cancel_reconcile",
|
||||
headers=env.storage_controller.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
# Node unclean drop API
|
||||
response = env.storage_controller.request(
|
||||
"POST",
|
||||
f"{env.storage_controller_api}/debug/v1/node/{env.pageservers[1].id}/drop",
|
||||
@@ -879,6 +971,7 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
assert len(env.storage_controller.node_list()) == 1
|
||||
|
||||
# Tenant unclean drop API
|
||||
response = env.storage_controller.request(
|
||||
"POST",
|
||||
f"{env.storage_controller_api}/debug/v1/tenant/{tenant_id}/drop",
|
||||
@@ -892,7 +985,6 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
|
||||
headers=env.storage_controller.headers(TokenScope.ADMIN),
|
||||
)
|
||||
assert len(response.json()) == 1
|
||||
|
||||
# Check that the 'drop' APIs didn't leave things in a state that would fail a consistency check: they're
|
||||
# meant to be unclean wrt the pageserver state, but not leave a broken storage controller behind.
|
||||
env.storage_controller.consistency_check()
|
||||
@@ -1660,6 +1752,11 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
|
||||
storcon_cli(["tenant-policy", "--tenant-id", str(env.initial_tenant), "--scheduling", "stop"])
|
||||
assert "Stop" in storcon_cli(["tenants"])[3]
|
||||
|
||||
# Cancel ongoing reconcile on a tenant
|
||||
storcon_cli(
|
||||
["tenant-shard-cancel-reconcile", "--tenant-shard-id", f"{env.initial_tenant}-0104"]
|
||||
)
|
||||
|
||||
# Change a tenant's placement
|
||||
storcon_cli(
|
||||
["tenant-policy", "--tenant-id", str(env.initial_tenant), "--placement", "secondary"]
|
||||
|
||||
@@ -1998,6 +1998,109 @@ def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder):
|
||||
pt_handle.join()
|
||||
|
||||
|
||||
def test_pull_timeline_while_evicted(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Verify that when pull_timeline is used on an evicted timeline, it does not result in
|
||||
promoting any segments to local disk on the source, and the timeline is correctly instantiated
|
||||
in evicted state on the destination. This behavior is important to avoid ballooning disk
|
||||
usage when doing mass migration of timelines.
|
||||
"""
|
||||
neon_env_builder.num_safekeepers = 4
|
||||
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
|
||||
|
||||
# Configure safekeepers with ultra-fast eviction policy
|
||||
neon_env_builder.safekeeper_extra_opts = [
|
||||
"--enable-offload",
|
||||
"--partial-backup-timeout",
|
||||
"50ms",
|
||||
"--control-file-save-interval",
|
||||
"1s",
|
||||
# Safekeepers usually wait a while before evicting something: for this test we want them to
|
||||
# evict things as soon as they are inactive.
|
||||
"--eviction-min-resident=100ms",
|
||||
"--delete-offloaded-wal",
|
||||
]
|
||||
|
||||
initial_tenant_conf = {"lagging_wal_timeout": "1s", "checkpoint_timeout": "100ms"}
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=initial_tenant_conf)
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
(src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[-1])
|
||||
log.info(f"Will pull_timeline on destination {dst_sk.id} from source {src_sk.id}")
|
||||
|
||||
ep = env.endpoints.create("main")
|
||||
ep.active_safekeepers = [s.id for s in env.safekeepers if s.id != dst_sk.id]
|
||||
log.info(f"Compute writing initially to safekeepers: {ep.active_safekeepers}")
|
||||
ep.active_safekeepers = [1, 2, 3] # Exclude dst_sk from set written by compute initially
|
||||
ep.start()
|
||||
ep.safe_psql("CREATE TABLE t(i int)")
|
||||
ep.safe_psql("INSERT INTO t VALUES (0)")
|
||||
ep.stop()
|
||||
|
||||
wait_lsn_force_checkpoint_at_sk(src_sk, tenant_id, timeline_id, env.pageserver)
|
||||
|
||||
src_http = src_sk.http_client()
|
||||
dst_http = dst_sk.http_client()
|
||||
|
||||
def evicted_on_source():
|
||||
# Wait for timeline to go into evicted state
|
||||
assert src_http.get_eviction_state(timeline_id) != "Present"
|
||||
assert (
|
||||
src_http.get_metric_value(
|
||||
"safekeeper_eviction_events_completed_total", {"kind": "evict"}
|
||||
)
|
||||
or 0 > 0
|
||||
)
|
||||
assert src_http.get_metric_value("safekeeper_evicted_timelines") or 0 > 0
|
||||
# Check that on source no segment files are present
|
||||
assert src_sk.list_segments(tenant_id, timeline_id) == []
|
||||
|
||||
wait_until(60, 1, evicted_on_source)
|
||||
|
||||
# Invoke pull_timeline: source should serve snapshot request without promoting anything to local disk,
|
||||
# destination should import the control file only & go into evicted mode immediately
|
||||
dst_sk.pull_timeline([src_sk], tenant_id, timeline_id)
|
||||
|
||||
# Check that on source and destination no segment files are present
|
||||
assert src_sk.list_segments(tenant_id, timeline_id) == []
|
||||
assert dst_sk.list_segments(tenant_id, timeline_id) == []
|
||||
|
||||
# Check that the timeline on the destination is in the expected evicted state.
|
||||
evicted_on_source() # It should still be evicted on the source
|
||||
|
||||
def evicted_on_destination():
|
||||
assert dst_http.get_eviction_state(timeline_id) != "Present"
|
||||
assert dst_http.get_metric_value("safekeeper_evicted_timelines") or 0 > 0
|
||||
|
||||
# This should be fast, it is a wait_until because eviction state is updated
|
||||
# in the background wrt pull_timeline.
|
||||
wait_until(10, 0.1, evicted_on_destination)
|
||||
|
||||
# Delete the timeline on the source, to prove that deletion works on an
|
||||
# evicted timeline _and_ that the final compute test is really not using
|
||||
# the original location
|
||||
src_sk.http_client().timeline_delete(tenant_id, timeline_id, only_local=True)
|
||||
|
||||
# Check that using the timeline correctly un-evicts it on the new location
|
||||
ep.active_safekeepers = [2, 3, 4]
|
||||
ep.start()
|
||||
ep.safe_psql("INSERT INTO t VALUES (0)")
|
||||
ep.stop()
|
||||
|
||||
def unevicted_on_dest():
|
||||
assert (
|
||||
dst_http.get_metric_value(
|
||||
"safekeeper_eviction_events_completed_total", {"kind": "restore"}
|
||||
)
|
||||
or 0 > 0
|
||||
)
|
||||
n_evicted = dst_sk.http_client().get_metric_value("safekeeper_evicted_timelines")
|
||||
assert n_evicted == 0
|
||||
|
||||
wait_until(10, 1, unevicted_on_dest)
|
||||
|
||||
|
||||
# In this test we check for excessive START_REPLICATION and START_WAL_PUSH queries
|
||||
# when compute is active, but there are no writes to the timeline. In that case
|
||||
# pageserver should maintain a single connection to safekeeper and don't attempt
|
||||
|
||||
Reference in New Issue
Block a user