mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-02 02:00:38 +00:00
Compare commits
5 Commits
erik/batch
...
cloneable/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0896dcf3d4 | ||
|
|
d06bf4b0fe | ||
|
|
1280b708f1 | ||
|
|
b4e00b8b22 | ||
|
|
10aaa3677d |
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -994,9 +994,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
|
||||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.8.0"
|
||||
version = "1.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
|
||||
checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
@@ -55,6 +55,7 @@ RUN set -e \
|
||||
--bin proxy \
|
||||
--bin neon_local \
|
||||
--bin storage_scrubber \
|
||||
--bin stream_events \
|
||||
--locked --release
|
||||
|
||||
# Build final image
|
||||
@@ -82,6 +83,7 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_control
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_scrubber /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/stream_events /usr/local/bin
|
||||
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v14 /usr/local/v14/
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v15 /usr/local/v15/
|
||||
|
||||
@@ -1,3 +1,45 @@
|
||||
commit 00aa659afc9c7336ab81036edec3017168aabf40
|
||||
Author: Heikki Linnakangas <heikki@neon.tech>
|
||||
Date: Tue Nov 12 16:59:19 2024 +0200
|
||||
|
||||
Temporarily disable test that depends on timezone
|
||||
|
||||
diff --git a/tests/expected/generalization.out b/tests/expected/generalization.out
|
||||
index 23ef5fa..9e60deb 100644
|
||||
--- a/ext-src/pg_anon-src/tests/expected/generalization.out
|
||||
+++ b/ext-src/pg_anon-src/tests/expected/generalization.out
|
||||
@@ -284,12 +284,9 @@ SELECT anon.generalize_tstzrange('19041107','century');
|
||||
["Tue Jan 01 00:00:00 1901 PST","Mon Jan 01 00:00:00 2001 PST")
|
||||
(1 row)
|
||||
|
||||
-SELECT anon.generalize_tstzrange('19041107','millennium');
|
||||
- generalize_tstzrange
|
||||
------------------------------------------------------------------
|
||||
- ["Thu Jan 01 00:00:00 1001 PST","Mon Jan 01 00:00:00 2001 PST")
|
||||
-(1 row)
|
||||
-
|
||||
+-- temporarily disabled, see:
|
||||
+-- https://gitlab.com/dalibo/postgresql_anonymizer/-/commit/199f0a392b37c59d92ae441fb8f037e094a11a52#note_2148017485
|
||||
+--SELECT anon.generalize_tstzrange('19041107','millennium');
|
||||
-- generalize_daterange
|
||||
SELECT anon.generalize_daterange('19041107');
|
||||
generalize_daterange
|
||||
diff --git a/tests/sql/generalization.sql b/tests/sql/generalization.sql
|
||||
index b868344..b4fc977 100644
|
||||
--- a/ext-src/pg_anon-src/tests/sql/generalization.sql
|
||||
+++ b/ext-src/pg_anon-src/tests/sql/generalization.sql
|
||||
@@ -61,7 +61,9 @@ SELECT anon.generalize_tstzrange('19041107','month');
|
||||
SELECT anon.generalize_tstzrange('19041107','year');
|
||||
SELECT anon.generalize_tstzrange('19041107','decade');
|
||||
SELECT anon.generalize_tstzrange('19041107','century');
|
||||
-SELECT anon.generalize_tstzrange('19041107','millennium');
|
||||
+-- temporarily disabled, see:
|
||||
+-- https://gitlab.com/dalibo/postgresql_anonymizer/-/commit/199f0a392b37c59d92ae441fb8f037e094a11a52#note_2148017485
|
||||
+--SELECT anon.generalize_tstzrange('19041107','millennium');
|
||||
|
||||
-- generalize_daterange
|
||||
SELECT anon.generalize_daterange('19041107');
|
||||
|
||||
commit 7dd414ee75f2875cffb1d6ba474df1f135a6fc6f
|
||||
Author: Alexey Masterov <alexeymasterov@neon.tech>
|
||||
Date: Fri May 31 06:34:26 2024 +0000
|
||||
|
||||
@@ -15,6 +15,9 @@ pub enum DownloadError {
|
||||
///
|
||||
/// Concurrency control is not timed within timeout.
|
||||
Timeout,
|
||||
/// Some integrity/consistency check failed during download. This is used during
|
||||
/// timeline loads to cancel the load of a tenant if some timeline detects fatal corruption.
|
||||
Fatal(String),
|
||||
/// The file was found in the remote storage, but the download failed.
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
@@ -29,6 +32,7 @@ impl std::fmt::Display for DownloadError {
|
||||
DownloadError::Unmodified => write!(f, "File was not modified"),
|
||||
DownloadError::Cancelled => write!(f, "Cancelled, shutting down"),
|
||||
DownloadError::Timeout => write!(f, "timeout"),
|
||||
DownloadError::Fatal(why) => write!(f, "Fatal read error: {why}"),
|
||||
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
|
||||
}
|
||||
}
|
||||
@@ -41,7 +45,7 @@ impl DownloadError {
|
||||
pub fn is_permanent(&self) -> bool {
|
||||
use DownloadError::*;
|
||||
match self {
|
||||
BadInput(_) | NotFound | Unmodified | Cancelled => true,
|
||||
BadInput(_) | NotFound | Unmodified | Fatal(_) | Cancelled => true,
|
||||
Timeout | Other(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1433,6 +1433,12 @@ impl Tenant {
|
||||
info!(%timeline_id, "index_part not found on remote");
|
||||
continue;
|
||||
}
|
||||
Err(DownloadError::Fatal(why)) => {
|
||||
// If, while loading one remote timeline, we saw an indication that our generation
|
||||
// number is likely invalid, then we should not load the whole tenant.
|
||||
error!(%timeline_id, "Fatal error loading timeline: {why}");
|
||||
anyhow::bail!(why.to_string());
|
||||
}
|
||||
Err(e) => {
|
||||
// Some (possibly ephemeral) error happened during index_part download.
|
||||
// Pretend the timeline exists to not delete the timeline directory,
|
||||
|
||||
@@ -574,12 +574,18 @@ impl RemoteTimelineClient {
|
||||
|
||||
if latest_index_generation > index_generation {
|
||||
// Unexpected! Why are we loading such an old index if a more recent one exists?
|
||||
tracing::warn!(
|
||||
// We will refuse to proceed, as there is no reasonable scenario where this should happen, but
|
||||
// there _is_ a clear bug/corruption scenario where it would happen (controller sets the generation
|
||||
// backwards).
|
||||
tracing::error!(
|
||||
?index_generation,
|
||||
?latest_index_generation,
|
||||
?latest_index_mtime,
|
||||
"Found a newer index while loading an old one"
|
||||
);
|
||||
return Err(DownloadError::Fatal(
|
||||
"Index age exceeds threshold and a newer index exists".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
450
proxy/src/bin/stream_events.rs
Normal file
450
proxy/src/bin/stream_events.rs
Normal file
@@ -0,0 +1,450 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::bail;
|
||||
use aws_config::environment::EnvironmentVariableCredentialsProvider;
|
||||
use aws_config::imds::credentials::ImdsCredentialsProvider;
|
||||
use aws_config::meta::credentials::CredentialsProviderChain;
|
||||
use aws_config::meta::region::RegionProviderChain;
|
||||
use aws_config::profile::ProfileFileCredentialsProvider;
|
||||
use aws_config::provider_config::ProviderConfig;
|
||||
use aws_config::web_identity_token::WebIdentityTokenCredentialsProvider;
|
||||
use aws_config::Region;
|
||||
use clap::{Parser, ValueEnum};
|
||||
use proxy::config::{self, remote_storage_from_toml, ProxyProtocolV2};
|
||||
use proxy::context::parquet::ParquetUploadArgs;
|
||||
use proxy::rate_limiter::RateBucketInfo;
|
||||
use proxy::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
|
||||
use proxy::redis::elasticache;
|
||||
use redis::streams::{StreamReadOptions, StreamReadReply};
|
||||
use redis::{AsyncCommands, FromRedisValue, Value};
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::warn;
|
||||
|
||||
#[global_allocator]
|
||||
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
|
||||
|
||||
#[derive(Clone, Debug, ValueEnum)]
|
||||
enum AuthBackendType {
|
||||
#[value(name("console"), alias("cplane"))]
|
||||
ControlPlane,
|
||||
|
||||
#[value(name("link"), alias("control-redirect"))]
|
||||
ConsoleRedirect,
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
Postgres,
|
||||
}
|
||||
|
||||
/// Neon proxy/router
|
||||
#[derive(Parser)]
|
||||
struct ProxyCliArgs {
|
||||
/// Name of the region this proxy is deployed in
|
||||
#[clap(long, default_value_t = String::new())]
|
||||
region: String,
|
||||
/// listen for incoming client connections on ip:port
|
||||
#[clap(short, long, default_value = "127.0.0.1:4432")]
|
||||
proxy: String,
|
||||
#[clap(value_enum, long, default_value_t = AuthBackendType::ConsoleRedirect)]
|
||||
auth_backend: AuthBackendType,
|
||||
/// listen for management callback connection on ip:port
|
||||
#[clap(short, long, default_value = "127.0.0.1:7000")]
|
||||
mgmt: String,
|
||||
/// listen for incoming http connections (metrics, etc) on ip:port
|
||||
#[clap(long, default_value = "127.0.0.1:7001")]
|
||||
http: String,
|
||||
/// listen for incoming wss connections on ip:port
|
||||
#[clap(long)]
|
||||
wss: Option<String>,
|
||||
/// redirect unauthenticated users to the given uri in case of console redirect auth
|
||||
#[clap(short, long, default_value = "http://localhost:3000/psql_session/")]
|
||||
uri: String,
|
||||
/// cloud API endpoint for authenticating users
|
||||
#[clap(
|
||||
short,
|
||||
long,
|
||||
default_value = "http://localhost:3000/authenticate_proxy_request/"
|
||||
)]
|
||||
auth_endpoint: String,
|
||||
/// JWT used to connect to control plane.
|
||||
#[clap(
|
||||
long,
|
||||
value_name = "JWT",
|
||||
default_value = "",
|
||||
env = "NEON_PROXY_TO_CONTROLPLANE_TOKEN"
|
||||
)]
|
||||
control_plane_token: Arc<str>,
|
||||
/// if this is not local proxy, this toggles whether we accept jwt or passwords for http
|
||||
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
is_auth_broker: bool,
|
||||
/// path to TLS key for client postgres connections
|
||||
///
|
||||
/// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir
|
||||
#[clap(short = 'k', long, alias = "ssl-key")]
|
||||
tls_key: Option<String>,
|
||||
/// path to TLS cert for client postgres connections
|
||||
///
|
||||
/// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir
|
||||
#[clap(short = 'c', long, alias = "ssl-cert")]
|
||||
tls_cert: Option<String>,
|
||||
/// path to directory with TLS certificates for client postgres connections
|
||||
#[clap(long)]
|
||||
certs_dir: Option<String>,
|
||||
/// timeout for the TLS handshake
|
||||
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
|
||||
handshake_timeout: tokio::time::Duration,
|
||||
/// http endpoint to receive periodic metric updates
|
||||
#[clap(long)]
|
||||
metric_collection_endpoint: Option<String>,
|
||||
/// how often metrics should be sent to a collection endpoint
|
||||
#[clap(long)]
|
||||
metric_collection_interval: Option<String>,
|
||||
/// cache for `wake_compute` api method (use `size=0` to disable)
|
||||
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
|
||||
wake_compute_cache: String,
|
||||
/// lock for `wake_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable).
|
||||
#[clap(long, default_value = config::ConcurrencyLockOptions::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK)]
|
||||
wake_compute_lock: String,
|
||||
/// lock for `connect_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable).
|
||||
#[clap(long, default_value = config::ConcurrencyLockOptions::DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK)]
|
||||
connect_compute_lock: String,
|
||||
/// Allow self-signed certificates for compute nodes (for testing)
|
||||
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
allow_self_signed_compute: bool,
|
||||
#[clap(flatten)]
|
||||
sql_over_http: SqlOverHttpArgs,
|
||||
/// timeout for scram authentication protocol
|
||||
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
|
||||
scram_protocol_timeout: tokio::time::Duration,
|
||||
/// size of the threadpool for password hashing
|
||||
#[clap(long, default_value_t = 4)]
|
||||
scram_thread_pool_size: u8,
|
||||
/// Endpoint rate limiter max number of requests per second.
|
||||
///
|
||||
/// Provided in the form `<Requests Per Second>@<Bucket Duration Size>`.
|
||||
/// Can be given multiple times for different bucket sizes.
|
||||
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_ENDPOINT_SET)]
|
||||
endpoint_rps_limit: Vec<RateBucketInfo>,
|
||||
/// Wake compute rate limiter max number of requests per second.
|
||||
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_SET)]
|
||||
wake_compute_limit: Vec<RateBucketInfo>,
|
||||
/// Whether the auth rate limiter actually takes effect (for testing)
|
||||
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
auth_rate_limit_enabled: bool,
|
||||
/// Authentication rate limiter max number of hashes per second.
|
||||
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_AUTH_SET)]
|
||||
auth_rate_limit: Vec<RateBucketInfo>,
|
||||
/// The IP subnet to use when considering whether two IP addresses are considered the same.
|
||||
#[clap(long, default_value_t = 64)]
|
||||
auth_rate_limit_ip_subnet: u8,
|
||||
/// Redis rate limiter max number of requests per second.
|
||||
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_SET)]
|
||||
redis_rps_limit: Vec<RateBucketInfo>,
|
||||
/// cache for `allowed_ips` (use `size=0` to disable)
|
||||
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
|
||||
allowed_ips_cache: String,
|
||||
/// cache for `role_secret` (use `size=0` to disable)
|
||||
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
|
||||
role_secret_cache: String,
|
||||
/// redis url for notifications (if empty, redis_host:port will be used for both notifications and streaming connections)
|
||||
#[clap(long)]
|
||||
redis_notifications: Option<String>,
|
||||
/// what from the available authentications type to use for the regional redis we have. Supported are "irsa" and "plain".
|
||||
#[clap(long, default_value = "irsa")]
|
||||
redis_auth_type: String,
|
||||
/// redis host for streaming connections (might be different from the notifications host)
|
||||
#[clap(long)]
|
||||
redis_host: Option<String>,
|
||||
/// redis port for streaming connections (might be different from the notifications host)
|
||||
#[clap(long)]
|
||||
redis_port: Option<u16>,
|
||||
/// redis cluster name, used in aws elasticache
|
||||
#[clap(long)]
|
||||
redis_cluster_name: Option<String>,
|
||||
/// redis user_id, used in aws elasticache
|
||||
#[clap(long)]
|
||||
redis_user_id: Option<String>,
|
||||
/// aws region to retrieve credentials
|
||||
#[clap(long, default_value_t = String::new())]
|
||||
aws_region: String,
|
||||
/// cache for `project_info` (use `size=0` to disable)
|
||||
#[clap(long, default_value = config::ProjectInfoCacheOptions::CACHE_DEFAULT_OPTIONS)]
|
||||
project_info_cache: String,
|
||||
/// cache for all valid endpoints
|
||||
#[clap(long, default_value = config::EndpointCacheConfig::CACHE_DEFAULT_OPTIONS)]
|
||||
endpoint_cache_config: String,
|
||||
#[clap(flatten)]
|
||||
parquet_upload: ParquetUploadArgs,
|
||||
|
||||
/// interval for backup metric collection
|
||||
#[clap(long, default_value = "10m", value_parser = humantime::parse_duration)]
|
||||
metric_backup_collection_interval: std::time::Duration,
|
||||
/// remote storage configuration for backup metric collection
|
||||
/// Encoded as toml (same format as pageservers), eg
|
||||
/// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
|
||||
#[clap(long, value_parser = remote_storage_from_toml)]
|
||||
metric_backup_collection_remote_storage: Option<RemoteStorageConfig>,
|
||||
/// chunk size for backup metric collection
|
||||
/// Size of each event is no more than 400 bytes, so 2**22 is about 200MB before the compression.
|
||||
#[clap(long, default_value = "4194304")]
|
||||
metric_backup_collection_chunk_size: usize,
|
||||
/// Whether to retry the connection to the compute node
|
||||
#[clap(long, default_value = config::RetryConfig::CONNECT_TO_COMPUTE_DEFAULT_VALUES)]
|
||||
connect_to_compute_retry: String,
|
||||
/// Whether to retry the wake_compute request
|
||||
#[clap(long, default_value = config::RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)]
|
||||
wake_compute_retry: String,
|
||||
|
||||
/// Configure if this is a private access proxy for the POC: In that case the proxy will ignore the IP allowlist
|
||||
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
is_private_access_proxy: bool,
|
||||
|
||||
/// Configure whether all incoming requests have a Proxy Protocol V2 packet.
|
||||
// TODO(conradludgate): switch default to rejected or required once we've updated all deployments
|
||||
#[clap(value_enum, long, default_value_t = ProxyProtocolV2::Supported)]
|
||||
proxy_protocol_v2: ProxyProtocolV2,
|
||||
|
||||
/// Time the proxy waits for the webauth session to be confirmed by the control plane.
|
||||
// TODO: rename to `console_redirect_confirmation_timeout`.
|
||||
#[clap(long, default_value = "2m", value_parser = humantime::parse_duration)]
|
||||
webauth_confirmation_timeout: std::time::Duration,
|
||||
}
|
||||
|
||||
#[derive(clap::Args, Clone, Copy, Debug)]
|
||||
struct SqlOverHttpArgs {
|
||||
/// timeout for http connection requests
|
||||
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
|
||||
sql_over_http_timeout: tokio::time::Duration,
|
||||
|
||||
/// Whether the SQL over http pool is opt-in
|
||||
#[clap(long, default_value_t = true, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
sql_over_http_pool_opt_in: bool,
|
||||
|
||||
/// How many connections to pool for each endpoint. Excess connections are discarded
|
||||
#[clap(long, default_value_t = 20)]
|
||||
sql_over_http_pool_max_conns_per_endpoint: usize,
|
||||
|
||||
/// How many connections to pool for each endpoint. Excess connections are discarded
|
||||
#[clap(long, default_value_t = 20000)]
|
||||
sql_over_http_pool_max_total_conns: usize,
|
||||
|
||||
/// How long pooled connections should remain idle for before closing
|
||||
#[clap(long, default_value = "5m", value_parser = humantime::parse_duration)]
|
||||
sql_over_http_idle_timeout: tokio::time::Duration,
|
||||
|
||||
/// Duration each shard will wait on average before a GC sweep.
|
||||
/// A longer time will causes sweeps to take longer but will interfere less frequently.
|
||||
#[clap(long, default_value = "10m", value_parser = humantime::parse_duration)]
|
||||
sql_over_http_pool_gc_epoch: tokio::time::Duration,
|
||||
|
||||
/// How many shards should the global pool have. Must be a power of two.
|
||||
/// More shards will introduce less contention for pool operations, but can
|
||||
/// increase memory used by the pool
|
||||
#[clap(long, default_value_t = 128)]
|
||||
sql_over_http_pool_shards: usize,
|
||||
|
||||
#[clap(long, default_value_t = 10000)]
|
||||
sql_over_http_client_conn_threshold: u64,
|
||||
|
||||
#[clap(long, default_value_t = 64)]
|
||||
sql_over_http_cancel_set_shards: usize,
|
||||
|
||||
#[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB
|
||||
sql_over_http_max_request_size_bytes: u64,
|
||||
|
||||
#[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB
|
||||
sql_over_http_max_response_size_bytes: usize,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let _logging_guard = proxy::logging::init().await?;
|
||||
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
|
||||
|
||||
let args = ProxyCliArgs::parse();
|
||||
|
||||
let region_provider =
|
||||
RegionProviderChain::default_provider().or_else(Region::new(args.aws_region.clone()));
|
||||
let provider_conf =
|
||||
ProviderConfig::without_region().with_region(region_provider.region().await);
|
||||
let aws_credentials_provider = {
|
||||
// uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
|
||||
CredentialsProviderChain::first_try("env", EnvironmentVariableCredentialsProvider::new())
|
||||
// uses "AWS_PROFILE" / `aws sso login --profile <profile>`
|
||||
.or_else(
|
||||
"profile-sso",
|
||||
ProfileFileCredentialsProvider::builder()
|
||||
.configure(&provider_conf)
|
||||
.build(),
|
||||
)
|
||||
// uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
|
||||
// needed to access remote extensions bucket
|
||||
.or_else(
|
||||
"token",
|
||||
WebIdentityTokenCredentialsProvider::builder()
|
||||
.configure(&provider_conf)
|
||||
.build(),
|
||||
)
|
||||
// uses imds v2
|
||||
.or_else("imds", ImdsCredentialsProvider::builder().build())
|
||||
};
|
||||
let elasticache_credentials_provider = Arc::new(elasticache::CredentialsProvider::new(
|
||||
elasticache::AWSIRSAConfig::new(
|
||||
args.aws_region.clone(),
|
||||
args.redis_cluster_name,
|
||||
args.redis_user_id,
|
||||
),
|
||||
aws_credentials_provider,
|
||||
));
|
||||
let regional_redis_client = match (args.redis_auth_type.as_str(), &args.redis_notifications) {
|
||||
("plain", redis_url) => match redis_url {
|
||||
None => {
|
||||
bail!("plain auth requires redis_notifications to be set");
|
||||
}
|
||||
Some(url) => Some(
|
||||
ConnectionWithCredentialsProvider::new_with_static_credentials(url.to_string()),
|
||||
),
|
||||
},
|
||||
("irsa", _) => match (&args.redis_host, args.redis_port) {
|
||||
(Some(host), Some(port)) => Some(
|
||||
ConnectionWithCredentialsProvider::new_with_credentials_provider(
|
||||
host.to_string(),
|
||||
port,
|
||||
elasticache_credentials_provider.clone(),
|
||||
),
|
||||
),
|
||||
(None, None) => {
|
||||
warn!("irsa auth requires redis-host and redis-port to be set, continuing without regional_redis_client");
|
||||
None
|
||||
}
|
||||
_ => {
|
||||
bail!("redis-host and redis-port must be specified together");
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
bail!("unknown auth type given");
|
||||
}
|
||||
};
|
||||
|
||||
let endpoint_cache_config: config::EndpointCacheConfig = args.endpoint_cache_config.parse()?;
|
||||
|
||||
let Some(mut regional_redis_client) = regional_redis_client else {
|
||||
bail!("no regional_redis_client");
|
||||
};
|
||||
|
||||
if let Err(e) = regional_redis_client.connect().await {
|
||||
bail!("error connecting to redis: {:?}", e);
|
||||
}
|
||||
|
||||
let mut last_id = "0-0".to_string();
|
||||
batch_read(
|
||||
&mut regional_redis_client,
|
||||
endpoint_cache_config.stream_name,
|
||||
StreamReadOptions::default().count(endpoint_cache_config.default_batch_size),
|
||||
&mut last_id,
|
||||
true,
|
||||
|event| {
|
||||
let json = serde_json::to_string(&event)?;
|
||||
println!("{}", json);
|
||||
Ok(())
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO: this could be an enum, but events in Redis need to be fixed first.
|
||||
// ProjectCreated was sent with type:branch_created. So we ignore type.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
struct CPlaneEvent {
|
||||
id: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
endpoint_created: Option<EndpointCreated>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
branch_created: Option<BranchCreated>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
project_created: Option<ProjectCreated>,
|
||||
|
||||
#[serde(rename = "type")]
|
||||
_type: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
struct EndpointCreated {
|
||||
endpoint_id: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
struct BranchCreated {
|
||||
branch_id: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
struct ProjectCreated {
|
||||
project_id: String,
|
||||
}
|
||||
|
||||
impl TryFrom<&Value> for CPlaneEvent {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(value: &Value) -> Result<Self, Self::Error> {
|
||||
let json = String::from_redis_value(value)?;
|
||||
Ok(serde_json::from_str(&json)?)
|
||||
}
|
||||
}
|
||||
|
||||
async fn batch_read(
|
||||
conn: &mut ConnectionWithCredentialsProvider,
|
||||
stream_name: String,
|
||||
opts: StreamReadOptions,
|
||||
last_id: &mut String,
|
||||
return_when_finish: bool,
|
||||
mut insert_event: impl FnMut(CPlaneEvent) -> anyhow::Result<()>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut total: usize = 0;
|
||||
loop {
|
||||
let mut res: StreamReadReply = conn
|
||||
.xread_options(&[&stream_name], &[last_id.as_str()], &opts)
|
||||
.await?;
|
||||
|
||||
if res.keys.is_empty() {
|
||||
if return_when_finish {
|
||||
if total != 0 {
|
||||
break;
|
||||
}
|
||||
anyhow::bail!(
|
||||
"Redis stream {} is empty, cannot be used to filter endpoints",
|
||||
stream_name
|
||||
);
|
||||
}
|
||||
// If we are not returning when finish, we should wait for more data.
|
||||
continue;
|
||||
}
|
||||
if res.keys.len() != 1 {
|
||||
anyhow::bail!("Cannot read from redis stream {}", stream_name);
|
||||
}
|
||||
|
||||
let key = res.keys.pop().expect("Checked length above");
|
||||
|
||||
for stream_id in key.ids {
|
||||
total += 1;
|
||||
for value in stream_id.map.values() {
|
||||
match value.try_into() {
|
||||
Ok::<CPlaneEvent, _>(mut event) => {
|
||||
event.id = Some(stream_id.id.clone());
|
||||
insert_event(event)?;
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("error parsing value {value:?}: {err:?}");
|
||||
}
|
||||
};
|
||||
}
|
||||
if total.is_power_of_two() {
|
||||
tracing::debug!("endpoints read {}", total);
|
||||
}
|
||||
*last_id = stream_id.id;
|
||||
}
|
||||
}
|
||||
tracing::info!("read {} endpoints/branches/projects from redis", total);
|
||||
Ok(())
|
||||
}
|
||||
@@ -80,7 +80,7 @@ impl ConnectionWithCredentialsProvider {
|
||||
redis::cmd("PING").query_async(con).await
|
||||
}
|
||||
|
||||
pub(crate) async fn connect(&mut self) -> anyhow::Result<()> {
|
||||
pub async fn connect(&mut self) -> anyhow::Result<()> {
|
||||
let _guard = self.mutex.lock().await;
|
||||
if let Some(con) = self.con.as_mut() {
|
||||
match Self::ping(con).await {
|
||||
|
||||
@@ -262,7 +262,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
|
||||
|
||||
// Send requests.
|
||||
for req in reqgen {
|
||||
while reply_rx.try_recv().is_ok() {} // discard replies, to avoid blocking
|
||||
_ = reply_rx.try_recv(); // discard any replies, to avoid blocking
|
||||
let msg = ProposerAcceptorMessage::AppendRequest(req);
|
||||
msg_tx.send(msg).await.expect("send failed");
|
||||
}
|
||||
|
||||
@@ -217,8 +217,7 @@ pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy<Histogram> = Lazy::new(|| {
|
||||
let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE);
|
||||
buckets.insert(0, 0.0);
|
||||
buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64);
|
||||
// TODO: tweak this.
|
||||
assert!(buckets.len() <= 16, "too many histogram buckets");
|
||||
assert!(buckets.len() <= 12, "too many histogram buckets");
|
||||
|
||||
register_histogram!(
|
||||
"safekeeper_wal_receiver_queue_depth",
|
||||
|
||||
@@ -7,15 +7,14 @@ use crate::metrics::{
|
||||
WAL_RECEIVERS, WAL_RECEIVER_QUEUE_DEPTH, WAL_RECEIVER_QUEUE_DEPTH_TOTAL,
|
||||
WAL_RECEIVER_QUEUE_SIZE_TOTAL,
|
||||
};
|
||||
use crate::safekeeper::{
|
||||
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
|
||||
ServerInfo,
|
||||
};
|
||||
use crate::safekeeper::AcceptorProposerMessage;
|
||||
use crate::safekeeper::ProposerAcceptorMessage;
|
||||
use crate::safekeeper::ServerInfo;
|
||||
use crate::timeline::WalResidentTimeline;
|
||||
use crate::wal_service::ConnectionId;
|
||||
use crate::GlobalTimelines;
|
||||
use anyhow::{anyhow, Context};
|
||||
use bytes::{BufMut as _, Bytes, BytesMut};
|
||||
use bytes::BytesMut;
|
||||
use parking_lot::MappedMutexGuard;
|
||||
use parking_lot::Mutex;
|
||||
use parking_lot::MutexGuard;
|
||||
@@ -207,8 +206,7 @@ impl Drop for WalReceiverGuard {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: reconsider this.
|
||||
pub const MSG_QUEUE_SIZE: usize = 4096;
|
||||
pub const MSG_QUEUE_SIZE: usize = 256;
|
||||
pub const REPLY_QUEUE_SIZE: usize = 16;
|
||||
|
||||
impl SafekeeperPostgresHandler {
|
||||
@@ -486,9 +484,6 @@ const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
|
||||
/// every 5 seconds, for 12 samples per poll. This will give a count of up to 12x active timelines.
|
||||
const METRICS_INTERVAL: Duration = Duration::from_secs(5);
|
||||
|
||||
/// The AppendRequest buffer size.
|
||||
const APPEND_BUFFER_SIZE: usize = 1024 * 1024;
|
||||
|
||||
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
|
||||
/// replies to reply_tx.
|
||||
///
|
||||
@@ -535,9 +530,6 @@ impl WalAcceptor {
|
||||
async fn run(&mut self) -> anyhow::Result<()> {
|
||||
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
|
||||
|
||||
// Buffer AppendRequests to submit them as a single large write.
|
||||
let mut append_buf = BufferedAppendRequest::new(APPEND_BUFFER_SIZE);
|
||||
|
||||
// Periodically flush the WAL and compute metrics.
|
||||
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
|
||||
flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||
@@ -554,7 +546,7 @@ impl WalAcceptor {
|
||||
// Process inbound message.
|
||||
msg = self.msg_rx.recv() => {
|
||||
// If disconnected, break to flush WAL and return.
|
||||
let Some(msg) = msg else {
|
||||
let Some(mut msg) = msg else {
|
||||
break;
|
||||
};
|
||||
|
||||
@@ -571,44 +563,11 @@ impl WalAcceptor {
|
||||
// This batches multiple appends per fsync. If the channel is empty after
|
||||
// sending the reply, we'll schedule an immediate flush.
|
||||
if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
|
||||
// Try to batch multiple messages into a single large write.
|
||||
if !append_buf.is_empty() || !self.msg_rx.is_empty() {
|
||||
if append_buf.add(&append_request) {
|
||||
continue; // message buffered, go get next message
|
||||
}
|
||||
|
||||
// Full buffer, write it and buffer this message for next iteration.
|
||||
dirty = true;
|
||||
let buf_req = append_buf.take().expect("empty buffer");
|
||||
let buf_msg = ProposerAcceptorMessage::NoFlushAppendRequest(buf_req);
|
||||
let reply = self.tli.process_msg(&buf_msg).await?;
|
||||
drop(buf_msg); // allow reusing buffer for add
|
||||
assert!(append_buf.add(&append_request), "empty buffer rejected msg");
|
||||
reply
|
||||
} else {
|
||||
dirty = true;
|
||||
let msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
|
||||
self.tli.process_msg(&msg).await?
|
||||
}
|
||||
} else {
|
||||
self.tli.process_msg(&msg).await?
|
||||
msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
|
||||
dirty = true;
|
||||
}
|
||||
}
|
||||
|
||||
// If there are no pending messages, write the append buffer.
|
||||
//
|
||||
// NB: we don't also flush the WAL here. Otherwise we can get into a regime where we
|
||||
// quickly drain msg_rx and fsync before the sender is able to repopulate msg_rx.
|
||||
// This happens consistently due to Tokio scheduling, leading to overeager fsyncing.
|
||||
// Instead, we perform the write without fsyncing and give the sender a chance to
|
||||
// get scheduled and populate msg_rx for the next iteration. If there are no further
|
||||
// messages, the next iteration will flush the WAL.
|
||||
_ = future::ready(()), if self.msg_rx.is_empty() && !append_buf.is_empty() => {
|
||||
dirty = true;
|
||||
let buf_req = append_buf.take().expect("empty buffer");
|
||||
self.tli
|
||||
.process_msg(&ProposerAcceptorMessage::NoFlushAppendRequest(buf_req))
|
||||
.await?
|
||||
self.tli.process_msg(&msg).await?
|
||||
}
|
||||
|
||||
// While receiving AppendRequests, flush the WAL periodically and respond with an
|
||||
@@ -620,11 +579,11 @@ impl WalAcceptor {
|
||||
.await?
|
||||
}
|
||||
|
||||
// If there are no pending messages, flush the WAL and append buffer immediately.
|
||||
// If there are no pending messages, flush the WAL immediately.
|
||||
//
|
||||
// TODO: this should be done via flush_ticker.reset_immediately(), but that's always
|
||||
// delayed by 1ms due to this bug: https://github.com/tokio-rs/tokio/issues/6866.
|
||||
_ = future::ready(()), if self.msg_rx.is_empty() && dirty => {
|
||||
_ = future::ready(()), if dirty && self.msg_rx.is_empty() => {
|
||||
dirty = false;
|
||||
flush_ticker.reset();
|
||||
self.tli
|
||||
@@ -668,115 +627,3 @@ impl Drop for WalAcceptor {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Buffers WAL data for multiple AppendRequests, to submit them as a single write.
|
||||
struct BufferedAppendRequest {
|
||||
/// The buffer capacity.
|
||||
capacity: usize,
|
||||
/// The buffered header and WAL data.
|
||||
buf: Option<(AppendRequestHeader, BytesMut)>,
|
||||
/// A previous buffer that can be reused when the returned message is dropped.
|
||||
reuse_buf: Option<Bytes>,
|
||||
/// If an AppendRequest is larger than the buffer capacity (when empty), just stash it here to
|
||||
/// avoid growing the buffer and copying it. This will be returned as-is.
|
||||
large: Option<AppendRequest>,
|
||||
}
|
||||
|
||||
impl BufferedAppendRequest {
|
||||
/// Creates a new append request buffer with the given capacity.
|
||||
fn new(capacity: usize) -> Self {
|
||||
Self {
|
||||
capacity,
|
||||
buf: None,
|
||||
reuse_buf: None,
|
||||
large: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds the given append request to the buffer, if possible. Returns `false` if the message
|
||||
/// can't be buffered, leaving self unmodified. An empty buffer will always accept a message.
|
||||
///
|
||||
/// If the buffer is not empty, the message must have the same term and proposer and contiguous
|
||||
/// `begin_lsn` and `end_lsn`. The buffer must have available capacity for the entire
|
||||
/// `wal_data`. If the message is greater than an empty buffer's capacity, it is accepted but
|
||||
/// simply stashed away in `large` without growing the buffer.
|
||||
pub fn add(&mut self, msg: &AppendRequest) -> bool {
|
||||
// If there is a stashed large message, reject further messages.
|
||||
if self.large.is_some() {
|
||||
return false;
|
||||
}
|
||||
|
||||
// If there is no existing buffer, initialize one with the message.
|
||||
let Some((ref mut h, ref mut wal_data)) = self.buf else {
|
||||
// If the message is larger than the buffer capacity, just stash it instead of growing.
|
||||
if msg.wal_data.len() > self.capacity {
|
||||
assert!(self.large.is_none());
|
||||
self.large = Some(msg.clone()); // clone is cheap with Bytes
|
||||
return true;
|
||||
}
|
||||
|
||||
// Reuse a previous buffer, if any, or allocate a new one.
|
||||
//
|
||||
// TODO: try_into_mut() is essentially runtime borrow checking. If AppendRequest used a
|
||||
// normal Vec<u8> we could do compile-time borrow checking instead and avoid panic.
|
||||
let mut wal_data = match self.reuse_buf.take() {
|
||||
Some(reuse_buf) => match reuse_buf.try_into_mut() {
|
||||
Ok(mut reuse_buf) => {
|
||||
assert_eq!(reuse_buf.capacity(), self.capacity);
|
||||
reuse_buf.clear();
|
||||
reuse_buf
|
||||
}
|
||||
Err(_) => panic!("couldn't reuse buffer, still in use"),
|
||||
},
|
||||
None => BytesMut::with_capacity(self.capacity),
|
||||
};
|
||||
// Copy the append request into the buffer.
|
||||
wal_data.put_slice(&msg.wal_data);
|
||||
self.buf = Some((msg.h, wal_data));
|
||||
return true;
|
||||
};
|
||||
|
||||
// The messages must have the same term and proposer.
|
||||
if h.term != msg.h.term || h.proposer_uuid != msg.h.proposer_uuid {
|
||||
return false;
|
||||
}
|
||||
// The messages must be contiguous.
|
||||
if h.end_lsn != msg.h.begin_lsn {
|
||||
return false;
|
||||
}
|
||||
// The message must fit in the buffer.
|
||||
if wal_data.len() + msg.wal_data.len() > self.capacity {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Add the message to the buffer, bumping the commit and truncate LSNs. We assume that later
|
||||
// messages have later commit/truncate LSNs.
|
||||
h.end_lsn = msg.h.end_lsn;
|
||||
h.commit_lsn = msg.h.commit_lsn;
|
||||
h.truncate_lsn = msg.h.truncate_lsn;
|
||||
wal_data.put_slice(&msg.wal_data);
|
||||
true
|
||||
}
|
||||
|
||||
/// Returns true if there is no buffered message.
|
||||
fn is_empty(&self) -> bool {
|
||||
self.buf.is_none() && self.large.is_none()
|
||||
}
|
||||
|
||||
/// Takes the buffered AppendRequest (if any), leaving a None in its place.
|
||||
///
|
||||
/// NB: The returned `wal_data` Bytes must be dropped before the next call to `add()`, in order
|
||||
/// to reuse the buffer. This is basically runtime borrow checking, because of Bytes.
|
||||
fn take(&mut self) -> Option<AppendRequest> {
|
||||
// If there is a stashed large message, return it.
|
||||
if let Some(large) = self.large.take() {
|
||||
assert!(self.buf.is_none(), "both buf and large are set");
|
||||
return Some(large);
|
||||
}
|
||||
|
||||
let (h, wal_data) = self.buf.take()?;
|
||||
let wal_data = wal_data.freeze();
|
||||
self.reuse_buf = Some(wal_data.clone()); // keep a reference to the buffer
|
||||
Some(AppendRequest { h, wal_data })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -296,13 +296,12 @@ pub struct ProposerElected {
|
||||
|
||||
/// Request with WAL message sent from proposer to safekeeper. Along the way it
|
||||
/// communicates commit_lsn.
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Debug)]
|
||||
pub struct AppendRequest {
|
||||
pub h: AppendRequestHeader,
|
||||
pub wal_data: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Deserialize)]
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct AppendRequestHeader {
|
||||
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
|
||||
pub term: Term,
|
||||
@@ -1167,7 +1166,7 @@ mod tests {
|
||||
proposer_uuid: [0; 16],
|
||||
};
|
||||
let mut append_request = AppendRequest {
|
||||
h: ar_hdr,
|
||||
h: ar_hdr.clone(),
|
||||
wal_data: Bytes::from_static(b"b"),
|
||||
};
|
||||
|
||||
@@ -1241,7 +1240,7 @@ mod tests {
|
||||
proposer_uuid: [0; 16],
|
||||
};
|
||||
let append_request = AppendRequest {
|
||||
h: ar_hdr,
|
||||
h: ar_hdr.clone(),
|
||||
wal_data: Bytes::from_static(b"b"),
|
||||
};
|
||||
|
||||
@@ -1249,7 +1248,7 @@ mod tests {
|
||||
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
|
||||
.await
|
||||
.unwrap();
|
||||
let mut ar_hrd2 = ar_hdr;
|
||||
let mut ar_hrd2 = ar_hdr.clone();
|
||||
ar_hrd2.begin_lsn = Lsn(4);
|
||||
ar_hrd2.end_lsn = Lsn(5);
|
||||
let append_request = AppendRequest {
|
||||
|
||||
@@ -127,23 +127,29 @@ pub struct PhysicalStorage {
|
||||
/// - doesn't point to the end of the segment
|
||||
file: Option<File>,
|
||||
|
||||
/// When false, we have just initialized storage using the LSN from find_end_of_wal().
|
||||
/// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular,
|
||||
/// there can be a case with unexpected .partial file.
|
||||
/// When true, WAL truncation potentially has been interrupted and we need
|
||||
/// to finish it before allowing WAL writes; see truncate_wal for details.
|
||||
/// In this case [`write_lsn`] can be less than actually written WAL on
|
||||
/// disk. In particular, there can be a case with unexpected .partial file.
|
||||
///
|
||||
/// Imagine the following:
|
||||
/// - 000000010000000000000001
|
||||
/// - it was fully written, but the last record is split between 2 segments
|
||||
/// - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in the end of this segment
|
||||
/// - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were initialized to 0/1FFFFF0
|
||||
/// - it was fully written, but the last record is split between 2
|
||||
/// segments
|
||||
/// - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in
|
||||
/// the end of this segment
|
||||
/// - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were
|
||||
/// initialized to 0/1FFFFF0
|
||||
/// - 000000010000000000000002.partial
|
||||
/// - it has only 1 byte written, which is not enough to make a full WAL record
|
||||
/// - it has only 1 byte written, which is not enough to make a full WAL
|
||||
/// record
|
||||
///
|
||||
/// Partial segment 002 has no WAL records, and it will be removed by the next truncate_wal().
|
||||
/// This flag will be set to true after the first truncate_wal() call.
|
||||
/// Partial segment 002 has no WAL records, and it will be removed by the
|
||||
/// next truncate_wal(). This flag will be set to true after the first
|
||||
/// truncate_wal() call.
|
||||
///
|
||||
/// [`write_lsn`]: Self::write_lsn
|
||||
is_truncated_after_restart: bool,
|
||||
pending_wal_truncation: bool,
|
||||
}
|
||||
|
||||
impl PhysicalStorage {
|
||||
@@ -208,7 +214,7 @@ impl PhysicalStorage {
|
||||
flush_record_lsn: flush_lsn,
|
||||
decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
|
||||
file: None,
|
||||
is_truncated_after_restart: false,
|
||||
pending_wal_truncation: true,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -405,6 +411,13 @@ impl Storage for PhysicalStorage {
|
||||
startpos
|
||||
);
|
||||
}
|
||||
if self.pending_wal_truncation {
|
||||
bail!(
|
||||
"write_wal called with pending WAL truncation, write_lsn={}, startpos={}",
|
||||
self.write_lsn,
|
||||
startpos
|
||||
);
|
||||
}
|
||||
|
||||
let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?;
|
||||
// WAL is written, updating write metrics
|
||||
@@ -479,15 +492,34 @@ impl Storage for PhysicalStorage {
|
||||
);
|
||||
}
|
||||
|
||||
// Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on
|
||||
// disk (this happens on each connect).
|
||||
if self.is_truncated_after_restart
|
||||
// Quick exit if nothing to do and we know that the state is clean to
|
||||
// avoid writing up to 16 MiB of zeros on disk (this happens on each
|
||||
// connect).
|
||||
if !self.pending_wal_truncation
|
||||
&& end_pos == self.write_lsn
|
||||
&& end_pos == self.flush_record_lsn
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Atomicity: we start with LSNs reset because once on disk deletion is
|
||||
// started it can't be reversed. However, we might crash/error in the
|
||||
// middle, leaving garbage above the truncation point. In theory,
|
||||
// concatenated with previous records it might form bogus WAL (though
|
||||
// very unlikely in practice because CRC would guard from that). To
|
||||
// protect, set pending_wal_truncation flag before beginning: it means
|
||||
// truncation must be retried and WAL writes are prohibited until it
|
||||
// succeeds. Flag is also set on boot because we don't know if the last
|
||||
// state was clean.
|
||||
//
|
||||
// Protocol (HandleElected before first AppendRequest) ensures we'll
|
||||
// always try to ensure clean truncation before any writes.
|
||||
self.pending_wal_truncation = true;
|
||||
|
||||
self.write_lsn = end_pos;
|
||||
self.write_record_lsn = end_pos;
|
||||
self.flush_record_lsn = end_pos;
|
||||
|
||||
// Close previously opened file, if any
|
||||
if let Some(unflushed_file) = self.file.take() {
|
||||
self.fdatasync_file(&unflushed_file).await?;
|
||||
@@ -513,11 +545,7 @@ impl Storage for PhysicalStorage {
|
||||
fs::rename(wal_file_path, wal_file_partial_path).await?;
|
||||
}
|
||||
|
||||
// Update LSNs
|
||||
self.write_lsn = end_pos;
|
||||
self.write_record_lsn = end_pos;
|
||||
self.flush_record_lsn = end_pos;
|
||||
self.is_truncated_after_restart = true;
|
||||
self.pending_wal_truncation = false;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,8 @@ from typing import TYPE_CHECKING, cast, final
|
||||
|
||||
import requests
|
||||
|
||||
from fixtures.log_helper import log
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any, Literal, Optional
|
||||
|
||||
@@ -30,7 +32,11 @@ class NeonAPI:
|
||||
kwargs["headers"] = {}
|
||||
kwargs["headers"]["Authorization"] = f"Bearer {self.__neon_api_key}"
|
||||
|
||||
return requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs)
|
||||
resp = requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs)
|
||||
log.debug("%s %s returned a %d: %s", method, endpoint, resp.status_code, resp.text)
|
||||
resp.raise_for_status()
|
||||
|
||||
return resp
|
||||
|
||||
def create_project(
|
||||
self,
|
||||
@@ -66,8 +72,6 @@ class NeonAPI:
|
||||
json=data,
|
||||
)
|
||||
|
||||
assert resp.status_code == 201
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def get_project_details(self, project_id: str) -> dict[str, Any]:
|
||||
@@ -79,7 +83,7 @@ class NeonAPI:
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def delete_project(
|
||||
@@ -95,8 +99,6 @@ class NeonAPI:
|
||||
},
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def start_endpoint(
|
||||
@@ -112,8 +114,6 @@ class NeonAPI:
|
||||
},
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def suspend_endpoint(
|
||||
@@ -129,8 +129,6 @@ class NeonAPI:
|
||||
},
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def restart_endpoint(
|
||||
@@ -146,8 +144,6 @@ class NeonAPI:
|
||||
},
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def create_endpoint(
|
||||
@@ -178,8 +174,6 @@ class NeonAPI:
|
||||
json=data,
|
||||
)
|
||||
|
||||
assert resp.status_code == 201
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def get_connection_uri(
|
||||
@@ -206,8 +200,6 @@ class NeonAPI:
|
||||
},
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def get_branches(self, project_id: str) -> dict[str, Any]:
|
||||
@@ -219,8 +211,6 @@ class NeonAPI:
|
||||
},
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def get_endpoints(self, project_id: str) -> dict[str, Any]:
|
||||
@@ -232,8 +222,6 @@ class NeonAPI:
|
||||
},
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def get_operations(self, project_id: str) -> dict[str, Any]:
|
||||
@@ -246,8 +234,6 @@ class NeonAPI:
|
||||
},
|
||||
)
|
||||
|
||||
assert resp.status_code == 200
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def wait_for_operation_to_finish(self, project_id: str):
|
||||
|
||||
@@ -35,9 +35,10 @@ from fixtures.pageserver.utils import (
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.remote_storage import (
|
||||
LocalFsStorage,
|
||||
RemoteStorageKind,
|
||||
)
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.utils import run_only_on_default_postgres, wait_until
|
||||
from fixtures.workload import Workload
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -728,3 +729,68 @@ def test_upgrade_generationless_local_file_paths(
|
||||
)
|
||||
# We should download into the same local path we started with
|
||||
assert os.path.exists(victim_path)
|
||||
|
||||
|
||||
@run_only_on_default_postgres("Only tests index logic")
|
||||
def test_old_index_time_threshold(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Exercise pageserver's detection of trying to load an ancient non-latest index.
|
||||
(see https://github.com/neondatabase/neon/issues/6951)
|
||||
"""
|
||||
|
||||
# Run with local_fs because we will interfere with mtimes by local filesystem access
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
workload.write_rows(32)
|
||||
|
||||
# Remember generation 1's index path
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
index_path = env.pageserver_remote_storage.index_path(tenant_id, timeline_id)
|
||||
|
||||
# Increment generation by detaching+attaching, and write+flush some data to get a new remote index
|
||||
env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"})
|
||||
env.storage_controller.tenant_policy_update(tenant_id, {"placement": {"Attached": 0}})
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
workload.churn_rows(32)
|
||||
|
||||
# A new index should have been written
|
||||
assert env.pageserver_remote_storage.index_path(tenant_id, timeline_id) != index_path
|
||||
|
||||
# Hack the mtime on the generation 1 index
|
||||
log.info(f"Setting old mtime on {index_path}")
|
||||
os.utime(index_path, times=(time.time(), time.time() - 30 * 24 * 3600))
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*Found a newer index while loading an old one.*",
|
||||
".*Index age exceeds threshold and a newer index exists.*",
|
||||
]
|
||||
)
|
||||
|
||||
# Detach from storage controller + attach in an old generation directly on the pageserver.
|
||||
workload.stop()
|
||||
env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"})
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
env.storage_controller.tenant_policy_update(tenant_id, {"scheduling": "Stop"})
|
||||
env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy")
|
||||
|
||||
# The controller would not do this (attach in an old generation): we are doing it to simulate
|
||||
# a hypothetical profound bug in the controller.
|
||||
env.pageserver.http_client().tenant_location_conf(
|
||||
tenant_id, {"generation": 1, "mode": "AttachedSingle", "tenant_conf": {}}
|
||||
)
|
||||
|
||||
# The pageserver should react to this situation by refusing to attach the tenant and putting
|
||||
# it into Broken state
|
||||
env.pageserver.allowed_errors.append(".*tenant is broken.*")
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match="tenant is broken: Index age exceeds threshold and a newer index exists",
|
||||
):
|
||||
env.pageserver.http_client().timeline_detail(tenant_id, timeline_id)
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: de0a000daf...c5e0d642ef
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: fd631a9590...1feff6b60f
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 03b43900ed...b0b693ea29
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: ae4cc30dba...aa2e29f2b6
16
vendor/revisions.json
vendored
16
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.0",
|
||||
"ae4cc30dba24f3910533e5a48e8103c3f2fff300"
|
||||
"17.1",
|
||||
"aa2e29f2b6952140dfe51876bbd11054acae776f"
|
||||
],
|
||||
"v16": [
|
||||
"16.4",
|
||||
"03b43900edc5d8d6eecec460bfc89aec7174bd84"
|
||||
"16.5",
|
||||
"b0b693ea298454e95e6b154780d1fd586a244dfd"
|
||||
],
|
||||
"v15": [
|
||||
"15.8",
|
||||
"fd631a959049dfe2b82f67409c8b8b0d3e0016d1"
|
||||
"15.9",
|
||||
"1feff6b60f07cb71b665d0f5ead71a4320a71743"
|
||||
],
|
||||
"v14": [
|
||||
"14.13",
|
||||
"de0a000dafc2e66ce2e39282d3aa1c704fe0390e"
|
||||
"14.14",
|
||||
"c5e0d642efb02e4bfedc283b0a7707fe6c79cc89"
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user