Compare commits

..

2 Commits

Author SHA1 Message Date
John Spray
890003f97f Rename SegmentSize -> WalSegmentSize 2025-06-25 14:45:52 +01:00
John Spray
68491147f5 libs: introduce SegmentSize type
Fixes: https://github.com/neondatabase/neon/issues/612
2025-06-25 14:43:12 +01:00
84 changed files with 559 additions and 1392 deletions

2
Cargo.lock generated
View File

@@ -6815,7 +6815,6 @@ dependencies = [
"hex",
"http-utils",
"humantime",
"humantime-serde",
"hyper 0.14.30",
"itertools 0.10.5",
"json-structural-diff",
@@ -6826,7 +6825,6 @@ dependencies = [
"pageserver_api",
"pageserver_client",
"postgres_connection",
"posthog_client_lite",
"rand 0.8.5",
"regex",
"reqwest",

View File

@@ -12,7 +12,6 @@ use std::{env, fs};
use anyhow::{Context, bail};
use clap::ValueEnum;
use pageserver_api::config::PostHogConfig;
use pem::Pem;
use postgres_backend::AuthType;
use reqwest::{Certificate, Url};
@@ -214,8 +213,6 @@ pub struct NeonStorageControllerConf {
pub timeline_safekeeper_count: Option<i64>,
pub posthog_config: Option<PostHogConfig>,
pub kick_secondary_downloads: Option<bool>,
}
@@ -248,7 +245,6 @@ impl Default for NeonStorageControllerConf {
use_https_safekeeper_api: false,
use_local_compute_notifications: true,
timeline_safekeeper_count: None,
posthog_config: None,
kick_secondary_downloads: None,
}
}

View File

@@ -143,8 +143,6 @@ impl PageServerNode {
overrides.push(format!("ssl_ca_file='{}'", ssl_ca_file.to_str().unwrap()));
}
overrides.push("dev_mode=true".to_owned());
// Apply the user-provided overrides
overrides.push({
let mut doc =

View File

@@ -161,7 +161,6 @@ impl SafekeeperNode {
listen_http,
"--availability-zone".to_owned(),
availability_zone,
"--dev".to_owned(),
];
if let Some(pg_tenant_only_port) = self.conf.pg_tenant_only_port {
let listen_pg_tenant_only = format!("{}:{}", self.listen_addr, pg_tenant_only_port);

View File

@@ -642,18 +642,6 @@ impl StorageController {
args.push(format!("--timeline-safekeeper-count={sk_cnt}"));
}
let mut envs = vec![
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
];
if let Some(posthog_config) = &self.config.posthog_config {
envs.push((
"POSTHOG_CONFIG".to_string(),
serde_json::to_string(posthog_config)?,
));
}
println!("Starting storage controller");
background_process::start_process(
@@ -661,7 +649,10 @@ impl StorageController {
&instance_dir,
&self.env.storage_controller_bin(),
args,
envs,
vec![
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
],
background_process::InitialPidFile::Create(self.pid_file(start_args.instance_id)),
&start_args.start_timeout,
|| async {

View File

@@ -65,7 +65,6 @@ services:
--id=$$SAFEKEEPER_ID
--broker-endpoint=$$BROKER_ENDPOINT
-D /data
--dev
--remote-storage=\"{endpoint='http://minio:9000',
bucket_name='neon',
bucket_region='eu-north-1',
@@ -96,7 +95,6 @@ services:
--id=$$SAFEKEEPER_ID
--broker-endpoint=$$BROKER_ENDPOINT
-D /data
--dev
--remote-storage=\"{endpoint='http://minio:9000',
bucket_name='neon',
bucket_region='eu-north-1',
@@ -127,7 +125,6 @@ services:
--id=$$SAFEKEEPER_ID
--broker-endpoint=$$BROKER_ENDPOINT
-D /data
--dev
--remote-storage=\"{endpoint='http://minio:9000',
bucket_name='neon',
bucket_region='eu-north-1',

View File

@@ -6,4 +6,3 @@ remote_storage={ endpoint='http://minio:9000', bucket_name='neon', bucket_region
control_plane_api='http://0.0.0.0:6666' # No storage controller in docker compose, specify a junk address
control_plane_emergency_mode=true
virtual_file_io_mode="buffered" # the CI runners where we run the docker compose tests have slow disks
dev_mode=true

View File

@@ -1,396 +0,0 @@
# Memo: Endpoint Persistent Unlogged Files Storage
Created on 2024-11-05
Implemented on N/A
## Summary
A design for a storage system that allows storage of files required to make
Neon's Endpoints have a better experience at or after a reboot.
## Motivation
Several systems inside PostgreSQL (and Neon) need some persistent storage for
optimal workings across reboots and restarts, but still work without.
Examples are the query-level statistics files of `pg_stat_statements` in
`pg_stat/pg_stat_statements.stat`, and `pg_prewarm`'s `autoprewarm.blocks`.
We need a storage system that can store and manage these files for each
Endpoint, without necessarily granting users access to an unlimited storage
device.
## Goals
- Store known files for Endpoints with reasonable persistence.
_Data loss in this service, while annoying and bad for UX, won't lose any
customer's data._
## Non Goals (if relevant)
- This storage system does not need branching, file versioning, or other such
features. The files are as ephemeral to the timeline of the data as the
Endpoints that host the data.
- This storage system does not need to store _all_ user files, only 'known'
user files.
- This storage system does not need to be hosted fully inside Computes.
_Instead, this will be a separate component similar to Pageserver,
SafeKeeper, the S3 proxy used for dynamically loaded extensions, etc._
## Impacted components
- Compute needs new code to load and store these files in its lifetime.
- Control Plane needs to consider this new storage system when signalling
the deletion of an Endpoint, Timeline, or Tenant.
- Control Plane needs to consider this new storage system when it resets
or re-assigns an endpoint's timeline/branch state.
A new service is created: the Endpoint Persistent Unlogged Files Storage
service. This could be integrated in e.g. Pageserver or Control Plane, or a
separately hosted service.
## Proposed implementation
Endpoint-related data files are managed by a newly designed service (which
optionally is integrated in an existing service like Pageserver or Control
Plane), which stores data directly into S3 or any blob storage of choice.
Upon deletion of the Endpoint, or reassignment of the endpoint to a different
branch, this ephemeral data is dropped: the data stored may not match the
state of the branch's data after reassignment, and on endpoint deletion the
data won't have any use to the user.
Compute gets credentials (JWT token with Tenant, Timeline & Endpoint claims)
which it can use to authenticate to this new service and retrieve and store
data associated with this endpoint. This limited scope reduces leaks of data
across endpoints and timeline resets, and limits the ability of endpoints to
mess with other endpoints' data.
The path of this endpoint data in S3 is initially as follows:
s3://<regional-epufs-bucket>/
tenants/
<hex-tenant-id>/
tenants/
<hex-timeline-id>/
endpoints/
<endpoint-id>/
pgdata/
<file_path_in_pgdatadir>
For other blob storages an equivalent or similar path can be constructed.
### Reliability, failure modes and corner cases (if relevant)
Reliability is important, but not critical to the workings of Neon. The data
stored in this service will, when lost, reduce performance, but won't be a
cause of permanent data loss - only operational metadata is stored.
Most, if not all, blob storage services have sufficiently high persistence
guarantees to cater our need for persistence and uptime. The only concern with
blob storages is that the access latency is generally higher than local disk,
but for the object types stored (cache state, ...) I don't think this will be
much of an issue.
### Interaction/Sequence diagram (if relevant)
In these diagrams you can replace S3 with any persistent storage device of
choice, but S3 is chosen as representative name: The well-known and short name
of AWS' blob storage. Azure Blob Storage should work too, but it has a much
longer name making it less practical for the diagrams.
Write data:
```http
POST /tenants/<tenant-id>/timelines/<tl-id>/endpoints/<endpoint-id>/pgdata/<the-pgdata-path>
Host: epufs.svc.neon.local
<<<
200 OK
{
"version": "<opaque>", # opaque file version token, changes when the file contents change
"size": <bytes>,
}
```
```mermaid
sequenceDiagram
autonumber
participant co as Compute
participant ep as EPUFS
participant s3 as Blob Storage
co-->ep: Connect with credentials
co->>+ep: Store Unlogged Persistent File
opt is authenticated
ep->>s3: Write UPF to S3
end
ep->>-co: OK / Failure / Auth Failure
co-->ep: Cancel connection
```
Read data: (optional with cache-relevant request parameters, e.g. If-Modified-Since)
```http
GET /tenants/<tenant-id>/timelines/<tl-id>/endpoints/<endpoint-id>/pgdata/<the-pgdata-path>
Host: epufs.svc.neon.local
<<<
200 OK
<file data>
```
```mermaid
sequenceDiagram
autonumber
participant co as Compute
participant ep as EPUFS
participant s3 as Blob Storage
co->>+ep: Read Unlogged Persistent File
opt is authenticated
ep->>+s3: Request UPF from storage
s3->>-ep: Receive UPF from storage
end
ep->>-co: OK(response) / Failure(storage, auth, ...)
```
Compute Startup:
```mermaid
sequenceDiagram
autonumber
participant co as Compute
participant ps as Pageserver
participant ep as EPUFS
participant es as Extension server
note over co: Bind endpoint ep-xxx
par Get basebackup
co->>+ps: Request basebackup @ LSN
ps-)ps: Construct basebackup
ps->>-co: Receive basebackup TAR @ LSN
and Get startup-critical Unlogged Persistent Files
co->>+ep: Get all UPFs of endpoint ep-xxx
ep-)ep: Retrieve and gather all UPFs
ep->>-co: TAR of UPFs
and Get startup-critical extensions
loop For every startup-critical extension
co->>es: Get critical extension
es->>co: Receive critical extension
end
end
note over co: Start compute
```
CPlane ops:
```http
DELETE /tenants/<tenant-id>/timelines/<timeline-id>/endpoints/<endpoint-id>
Host: epufs.svc.neon.local
<<<
200 OK
{
"tenant": "<tenant-id>",
"timeline": "<timeline-id>",
"endpoint": "<endpoint-id>",
"deleted": {
"files": <count>,
"bytes": <count>,
},
}
```
```http
DELETE /tenants/<tenant-id>/timelines/<timeline-id>
Host: epufs.svc.neon.local
<<<
200 OK
{
"tenant": "<tenant-id>",
"timeline": "<timeline-id>",
"deleted": {
"files": <count>,
"bytes": <count>,
},
}
```
```http
DELETE /tenants/<tenant-id>
Host: epufs.svc.neon.local
<<<
200 OK
{
"tenant": "<tenant-id>",
"deleted": {
"files": <count>,
"bytes": <count>,
},
}
```
```mermaid
sequenceDiagram
autonumber
participant cp as Control Plane
participant ep as EPUFS
participant s3 as Blob Storage
alt Tenant deleted
cp-)ep: Tenant deleted
loop For every object associated with removed tenant
ep->>s3: Remove data of deleted tenant from Storage
end
opt
ep-)cp: Tenant cleanup complete
end
alt Timeline deleted
cp-)ep: Timeline deleted
loop For every object associated with removed timeline
ep->>s3: Remove data of deleted timeline from Storage
end
opt
ep-)cp: Timeline cleanup complete
end
else Endpoint reassigned or removed
cp->>+ep: Endpoint reassigned
loop For every object associated with reassigned/removed endpoint
ep->>s3: Remove data from Storage
end
ep->>-cp: Cleanup complete
end
```
### Scalability (if relevant)
Provisionally: As this service is going to be part of compute startup, this
service should be able to quickly respond to all requests. Therefore this
service is deployed to every AZ we host Computes in, and Computes communicate
(generally) only to the EPUFS endpoint of the AZ they're hosted in.
Local caching of frequently restarted endpoints' data or metadata may be
needed for best performance. However, due to the regional nature of stored
data but zonal nature of the service deployment, we should be careful when we
implement any local caching, as it is possible that computes in AZ 1 will
update data originally written and thus cached by AZ 2. Cache version tests
and invalidation is therefore required if we want to roll out caching to this
service, which is too broad a scope for an MVC. This is why caching is left
out of scope for this RFC, and should be considered separately after this RFC
is implemented.
### Security implications (if relevant)
This service must be able to authenticate users at least by Tenant ID,
Timeline ID and Endpoint ID. This will use the existing JWT infrastructure of
Compute, which will be upgraded to the extent needed to support Timeline- and
Endpoint-based claims.
The service requires unlimited access to (a prefix of) a blob storage bucket,
and thus must be hosted outside the Compute VM sandbox.
A service that generates pre-signed request URLs for Compute to download the
data from that URL is likely problematic, too: Compute would be able to write
unlimited data to the bucket, or exfiltrate this signed URL to get read/write
access to specific objects in this bucket, which would still effectively give
users access to the S3 bucket (but with improved access logging).
There may be a use case for transferring data associated with one endpoint to
another endpoint (e.g. to make one endpoint warm its caches with the state of
another endpoint), but that's not currently in scope, and specific needs may
be solved through out-of-line communication of data or pre-signed URLs.
### Unresolved questions (if relevant)
Caching of files is not in the implementation scope of the document, but
should at some future point be considered to maximize performance.
## Alternative implementation (if relevant)
Several ideas have come up to solve this issue:
### Use AUXfile
One prevalent idea was to WAL-log the files using our AUXfile mechanism.
Benefits:
+ We already have this storage mechanism
Demerits:
- It isn't available on read replicas
- Additional WAL will be consumed during shutdown and after the shutdown
checkpoint, which needs PG modifications to work without panics.
- It increases the data we need to manage in our versioned storage, thus
causing higher storage costs with higher retention due to duplication at
the storage layer.
### Sign URLs for read/write operations, instead of proxying them
Benefits:
+ The service can be implemented with a much reduced IO budget
Demerits:
- Users could get access to these signed credentials
- Not all blob storage services may implement URL signing
### Give endpoints each their own directly accessed block volume
Benefits:
+ Easier to integrate for PostgreSQL
Demerits:
- Little control on data size and contents
- Potentially problematic as we'd need to store data all across the pgdata
directory.
- EBS is not a good candidate
- Attaches in 10s of seconds, if not more; i.e. too cold to start
- Shared EBS volumes are a no-go, as you'd have to schedule the endpoint
with users of the same EBS volumes, which can't work with VM migration
- EBS storage costs are very high (>80$/kilotenant when using a
volume/tenant)
- EBS volumes can't be mounted across AZ boundaries
- Bucket per endpoint is unfeasible
- S3 buckets are priced at $20/month per 1k, which we could better spend
on developers.
- Allocating service accounts takes time (100s of ms), and service accounts
are a limited resource, too; so they're not a good candidate to allocate
on a per-endpoint basis.
- Giving credentials limited to prefix has similar issues as the pre-signed
URL approach.
- Bucket DNS lookup will fill DNS caches and put pressure on DNS lookup
much more than our current systems would.
- Volumes bound by hypervisor are unlikely
- This requires significant investment and increased software on the
hypervisor.
- It is unclear if we can attach volumes after boot, i.e. for pooled
instances.
### Put the files into a table
Benefits:
+ Mostly already available in PostgreSQL
Demerits:
- Uses WAL
- Can't be used after shutdown checkpoint
- Needs a RW endpoint, and table & catalog access to write to this data
- Gets hit with DB size limitations
- Depending on user acces:
- Inaccessible:
The user doesn't have control over database size caused by
these systems.
- Accessible:
The user can corrupt these files and cause the system to crash while
user-corrupted files are present, thus increasing on-call overhead.
## Definition of Done (if relevant)
This project is done if we have:
- One S3 bucket equivalent per region, which stores this per-endpoint data.
- A new service endpoint in at least every AZ, which indirectly grants
endpoints access to the data stored for these endpoints in these buckets.
- Compute writes & reads temp-data at shutdown and startup, respectively, for
at least the pg_prewarm or lfc_prewarm state files.
- Cleanup of endpoint data is triggered when the endpoint is deleted or is
detached from its current timeline.

View File

@@ -63,8 +63,7 @@ impl Display for NodeMetadata {
}
}
/// PostHog integration config. This is used in pageserver, storcon, and neon_local.
/// Ensure backward compatibility when adding new fields.
/// PostHog integration config.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PostHogConfig {
/// PostHog project ID
@@ -77,10 +76,7 @@ pub struct PostHogConfig {
pub private_api_url: String,
/// Public API URL
pub public_api_url: String,
/// Refresh interval for the feature flag spec.
/// The storcon will push the feature flag spec to the pageserver. If the pageserver does not receive
/// the spec for `refresh_interval`, it will fetch the spec from the PostHog API.
#[serde(default)]
/// Refresh interval for the feature flag spec
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
pub refresh_interval: Option<Duration>,
@@ -371,9 +367,6 @@ pub struct BasebackupCacheConfig {
// TODO(diko): support max_entry_size_bytes.
// pub max_entry_size_bytes: u64,
pub max_size_entries: usize,
/// Size of the channel used to send prepare requests to the basebackup cache worker.
/// If exceeded, new prepare requests will be dropped.
pub prepare_channel_size: usize,
}
impl Default for BasebackupCacheConfig {
@@ -383,7 +376,6 @@ impl Default for BasebackupCacheConfig {
max_total_size_bytes: 1024 * 1024 * 1024, // 1 GiB
// max_entry_size_bytes: 16 * 1024 * 1024, // 16 MiB
max_size_entries: 1000,
prepare_channel_size: 100,
}
}
}

View File

@@ -6,7 +6,7 @@ use postgres_ffi::v17::waldecoder_handler::WalStreamDecoderHandler;
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_versioninfo::PgMajorVersion;
use pprof::criterion::{Output, PProfProfiler};
use utils::lsn::Lsn;
use utils::lsn::{Lsn, WalSegmentSize};
const KB: usize = 1024;
@@ -22,23 +22,26 @@ criterion_main!(benches);
fn bench_complete_record(c: &mut Criterion) {
let mut g = c.benchmark_group("complete_record");
for size in [64, KB, 8 * KB, 128 * KB] {
let value_size = size as WalSegmentSize;
// Kind of weird to change the group throughput per benchmark, but it's the only way
// to vary it per benchmark. It works.
g.throughput(criterion::Throughput::Bytes(size as u64));
g.bench_function(format!("size={size}"), |b| run_bench(b, size).unwrap());
g.throughput(criterion::Throughput::Bytes(value_size as u64));
g.bench_function(format!("size={size}"), |b| {
run_bench(b, value_size).unwrap()
});
}
fn run_bench(b: &mut Bencher, size: usize) -> anyhow::Result<()> {
fn run_bench(b: &mut Bencher, size: WalSegmentSize) -> anyhow::Result<()> {
const PREFIX: &CStr = c"";
let value_size = LogicalMessageGenerator::make_value_size(size, PREFIX);
let value = vec![1; value_size];
let value = vec![1; value_size as usize];
let mut decoder = WalStreamDecoder::new(Lsn(0), PgMajorVersion::PG17);
let msg = LogicalMessageGenerator::new(PREFIX, &value)
.next()
.unwrap()
.encode(Lsn(0));
assert_eq!(msg.len(), size);
assert_eq!(msg.len(), size as usize);
b.iter(|| {
let msg = msg.clone(); // Bytes::clone() is cheap

View File

@@ -12,7 +12,7 @@
use bytes::Bytes;
use utils::bin_ser::SerializeError;
use utils::lsn::Lsn;
use utils::lsn::{Lsn, WalSegmentSize};
pub use postgres_versioninfo::PgMajorVersion;
@@ -241,7 +241,7 @@ pub use v14::xlog_utils::{
pub const BLCKSZ: u16 = 8192;
pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32);
pub const XLOG_BLCKSZ: usize = 8192;
pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;
pub const WAL_SEGMENT_SIZE: WalSegmentSize = 16 * 1024 * 1024;
pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;

View File

@@ -2,7 +2,7 @@ use std::ffi::{CStr, CString};
use bytes::{Bytes, BytesMut};
use crc32c::crc32c_append;
use utils::lsn::Lsn;
use utils::lsn::{Lsn, WalSegmentSize};
use super::bindings::{RmgrId, XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC};
use super::xlog_utils::{
@@ -39,7 +39,7 @@ impl Record {
// Construct the WAL record header.
let mut header = XLogRecord {
xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD + data_header.len() + self.data.len()) as u32,
xl_tot_len: XLOG_SIZE_OF_XLOG_RECORD + data_header.len() as WalSegmentSize + self.data.len() as WalSegmentSize,
xl_xid: 0,
xl_prev: prev_lsn.into(),
xl_info: self.info,
@@ -158,7 +158,7 @@ impl<R: RecordGenerator> WalGenerator<R> {
XLogLongPageHeaderData {
std: page_header,
xlp_sysid: Self::SYS_ID,
xlp_seg_size: WAL_SEGMENT_SIZE as u32,
xlp_seg_size: WAL_SEGMENT_SIZE,
xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
}
.encode()
@@ -234,10 +234,10 @@ impl LogicalMessageGenerator {
/// Computes how large a value must be to get a record of the given size. Convenience method to
/// construct records of pre-determined size. Panics if the record size is too small.
pub fn make_value_size(record_size: usize, prefix: &CStr) -> usize {
pub fn make_value_size(record_size: WalSegmentSize, prefix: &CStr) -> WalSegmentSize {
let xlog_header_size = XLOG_SIZE_OF_XLOG_RECORD;
let lm_header_size = size_of::<XlLogicalMessage>();
let prefix_size = prefix.to_bytes_with_nul().len();
let lm_header_size = size_of::<XlLogicalMessage>() as WalSegmentSize;
let prefix_size = prefix.to_bytes_with_nul().len() as WalSegmentSize;
let data_header_size = match record_size - xlog_header_size - 2 {
0..=255 => 2,
256..=258 => panic!("impossible record_size {record_size}"),

View File

@@ -108,7 +108,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
// parse long header
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD as usize{
return Ok(None);
}
@@ -123,7 +123,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
} else if self.lsn.block_offset() == 0 {
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD as usize{
return Ok(None);
}
@@ -153,7 +153,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
// peek xl_tot_len at the beginning of the record.
// FIXME: assumes little-endian
let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
if xl_tot_len < XLOG_SIZE_OF_XLOG_RECORD {
return Err(WalDecodeError {
msg: format!("invalid xl_tot_len {xl_tot_len}"),
lsn: self.lsn,
@@ -216,7 +216,7 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError> {
// We now have a record in the 'recordbuf' local variable.
let xlogrec =
XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD as usize]).map_err(|e| {
WalDecodeError {
msg: format!("xlog record deserialization failed {e}"),
lsn: self.lsn,

View File

@@ -266,7 +266,7 @@ pub fn decode_wal_record(
xlogrec.xl_info
);
let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD;
let remaining: usize = (xlogrec.xl_tot_len - XLOG_SIZE_OF_XLOG_RECORD) as usize;
if buf.remaining() != remaining {
//TODO error

View File

@@ -35,7 +35,7 @@ use std::time::SystemTime;
use utils::bin_ser::DeserializeError;
use utils::bin_ser::SerializeError;
use utils::lsn::Lsn;
use utils::lsn::{Lsn, WalSegmentSize};
pub const XLOG_FNAME_LEN: usize = 24;
pub const XLP_BKP_REMOVABLE: u16 = 0x0004;
@@ -43,9 +43,9 @@ pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8;
pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2;
pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = size_of::<XLogPageHeaderData>();
pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = size_of::<XLogLongPageHeaderData>();
pub const XLOG_SIZE_OF_XLOG_RECORD: usize = size_of::<XLogRecord>();
pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: WalSegmentSize = size_of::<XLogPageHeaderData>() as WalSegmentSize;
pub const XLOG_SIZE_OF_XLOG_LONG_PHD: WalSegmentSize = size_of::<XLogLongPageHeaderData>() as WalSegmentSize;
pub const XLOG_SIZE_OF_XLOG_RECORD: WalSegmentSize = size_of::<XLogRecord>() as WalSegmentSize;
#[allow(clippy::identity_op)]
pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
@@ -58,19 +58,19 @@ pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
/// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
const XID_CHECKPOINT_INTERVAL: u32 = 1024;
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: WalSegmentSize) -> XLogSegNo {
(0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
}
pub fn XLogSegNoOffsetToRecPtr(
segno: XLogSegNo,
offset: u32,
wal_segsz_bytes: usize,
wal_segsz_bytes: WalSegmentSize,
) -> XLogRecPtr {
segno * (wal_segsz_bytes as u64) + (offset as u64)
}
pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: WalSegmentSize) -> String {
format!(
"{:>08X}{:>08X}{:>08X}",
tli,
@@ -81,7 +81,7 @@ pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize
pub fn XLogFromFileName(
fname: &OsStr,
wal_seg_size: usize,
wal_seg_size: WalSegmentSize,
) -> anyhow::Result<(XLogSegNo, TimeLineID)> {
if let Some(fname_str) = fname.to_str() {
let tli = u32::from_str_radix(&fname_str[0..8], 16)?;
@@ -111,7 +111,7 @@ pub fn IsPartialXLogFileName(fname: &OsStr) -> bool {
/// If LSN points to the beginning of the page, then shift it to first record,
/// otherwise align on 8-bytes boundary (required for WAL records)
pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
pub fn normalize_lsn(lsn: Lsn, seg_sz: WalSegmentSize) -> Lsn {
if lsn.0 % XLOG_BLCKSZ as u64 == 0 {
let hdr_size = if lsn.0 % seg_sz as u64 == 0 {
XLOG_SIZE_OF_XLOG_LONG_PHD
@@ -227,7 +227,7 @@ pub use timestamp_conversions::{to_pg_timestamp, try_from_pg_timestamp};
// back.
pub fn find_end_of_wal(
data_dir: &Path,
wal_seg_size: usize,
wal_seg_size: WalSegmentSize,
start_lsn: Lsn, // start reading WAL at this point; must point at record start_lsn.
) -> anyhow::Result<Lsn> {
let mut result = start_lsn;
@@ -431,14 +431,14 @@ impl CheckPoint {
/// page of the segment and the page that contains the given LSN.
/// We need this segment to start compute node.
pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Bytes, SerializeError> {
let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE);
let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE as usize);
let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
let page_off = lsn.block_offset();
let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
let first_page_only = seg_off < XLOG_BLCKSZ;
let first_page_only = seg_off < XLOG_BLCKSZ as WalSegmentSize;
// If first records starts in the middle of the page, pretend in page header
// there is a fake record which ends where first real record starts. This
// makes pg_waldump etc happy.
@@ -460,12 +460,12 @@ pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Byte
xlp_info: pg_constants::XLP_LONG_HEADER | infoflags,
xlp_tli: PG_TLI,
xlp_pageaddr: pageaddr,
xlp_rem_len: shdr_rem_len as u32,
xlp_rem_len: shdr_rem_len,
..Default::default() // Put 0 in padding fields.
}
},
xlp_sysid: system_id,
xlp_seg_size: WAL_SEGMENT_SIZE as u32,
xlp_seg_size: WAL_SEGMENT_SIZE,
xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
};
@@ -473,7 +473,7 @@ pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Byte
seg_buf.extend_from_slice(&hdr_bytes);
//zero out the rest of the file
seg_buf.resize(WAL_SEGMENT_SIZE, 0);
seg_buf.resize(WAL_SEGMENT_SIZE as usize, 0);
if !first_page_only {
let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;

View File

@@ -389,7 +389,7 @@ impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
let xlog_switch_record_end: PgLsn =
client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
if u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ
if (u64::from(xlog_switch_record_end) % XLOG_BLCKSZ as u64) as u32
!= XLOG_SIZE_OF_XLOG_SHORT_PHD
{
warn!(

View File

@@ -81,10 +81,10 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
continue;
}
let mut f = File::options().write(true).open(file.path()).unwrap();
static ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE];
static ZEROS: [u8; WAL_SEGMENT_SIZE as usize] = [0u8; WAL_SEGMENT_SIZE as usize];
f.write_all(
&ZEROS[0..min(
WAL_SEGMENT_SIZE,
WAL_SEGMENT_SIZE as usize,
(u64::from(*start_lsn) - seg_start_lsn) as usize,
)],
)

View File

@@ -1,22 +1,17 @@
//! A background loop that fetches feature flags from PostHog and updates the feature store.
use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use std::{sync::Arc, time::Duration};
use arc_swap::ArcSwap;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info_span};
use crate::{
CaptureEvent, FeatureStore, LocalEvaluationResponse, PostHogClient, PostHogClientConfig,
};
use crate::{CaptureEvent, FeatureStore, PostHogClient, PostHogClientConfig};
/// A background loop that fetches feature flags from PostHog and updates the feature store.
pub struct FeatureResolverBackgroundLoop {
posthog_client: PostHogClient,
feature_store: ArcSwap<(SystemTime, Arc<FeatureStore>)>,
feature_store: ArcSwap<FeatureStore>,
cancel: CancellationToken,
}
@@ -24,35 +19,11 @@ impl FeatureResolverBackgroundLoop {
pub fn new(config: PostHogClientConfig, shutdown_pageserver: CancellationToken) -> Self {
Self {
posthog_client: PostHogClient::new(config),
feature_store: ArcSwap::new(Arc::new((
SystemTime::UNIX_EPOCH,
Arc::new(FeatureStore::new()),
))),
feature_store: ArcSwap::new(Arc::new(FeatureStore::new())),
cancel: shutdown_pageserver,
}
}
/// Update the feature store with a new feature flag spec bypassing the normal refresh loop.
pub fn update(&self, spec: String) -> anyhow::Result<()> {
let resp: LocalEvaluationResponse = serde_json::from_str(&spec)?;
self.update_feature_store_nofail(resp, "http_propagate");
Ok(())
}
fn update_feature_store_nofail(&self, resp: LocalEvaluationResponse, source: &'static str) {
let project_id = self.posthog_client.config.project_id.parse::<u64>().ok();
match FeatureStore::new_with_flags(resp.flags, project_id) {
Ok(feature_store) => {
self.feature_store
.store(Arc::new((SystemTime::now(), Arc::new(feature_store))));
tracing::info!("Feature flag updated from {}", source);
}
Err(e) => {
tracing::warn!("Cannot process feature flag spec from {}: {}", source, e);
}
}
}
pub fn spawn(
self: Arc<Self>,
handle: &tokio::runtime::Handle,
@@ -76,17 +47,6 @@ impl FeatureResolverBackgroundLoop {
_ = ticker.tick() => {}
_ = cancel.cancelled() => break
}
{
let last_update = this.feature_store.load().0;
if let Ok(elapsed) = last_update.elapsed() {
if elapsed < refresh_period {
tracing::debug!(
"Skipping feature flag refresh because it's too soon"
);
continue;
}
}
}
let resp = match this
.posthog_client
.get_feature_flags_local_evaluation()
@@ -98,7 +58,16 @@ impl FeatureResolverBackgroundLoop {
continue;
}
};
this.update_feature_store_nofail(resp, "refresh_loop");
let project_id = this.posthog_client.config.project_id.parse::<u64>().ok();
match FeatureStore::new_with_flags(resp.flags, project_id) {
Ok(feature_store) => {
this.feature_store.store(Arc::new(feature_store));
tracing::info!("Feature flag updated");
}
Err(e) => {
tracing::warn!("Cannot process feature flag spec: {}", e);
}
}
}
tracing::info!("PostHog feature resolver stopped");
}
@@ -123,6 +92,6 @@ impl FeatureResolverBackgroundLoop {
}
pub fn feature_store(&self) -> Arc<FeatureStore> {
self.feature_store.load().1.clone()
self.feature_store.load_full()
}
}

View File

@@ -544,8 +544,17 @@ impl PostHogClient {
self.config.server_api_key.starts_with("phs_")
}
/// Get the raw JSON spec, same as `get_feature_flags_local_evaluation` but without parsing.
pub async fn get_feature_flags_local_evaluation_raw(&self) -> anyhow::Result<String> {
/// Fetch the feature flag specs from the server.
///
/// This is unfortunately an undocumented API at:
/// - <https://posthog.com/docs/api/feature-flags#get-api-projects-project_id-feature_flags-local_evaluation>
/// - <https://posthog.com/docs/feature-flags/local-evaluation>
///
/// The handling logic in [`FeatureStore`] mostly follows the Python API implementation.
/// See `_compute_flag_locally` in <https://github.com/PostHog/posthog-python/blob/master/posthog/client.py>
pub async fn get_feature_flags_local_evaluation(
&self,
) -> anyhow::Result<LocalEvaluationResponse> {
// BASE_URL/api/projects/:project_id/feature_flags/local_evaluation
// with bearer token of self.server_api_key
// OR
@@ -579,22 +588,7 @@ impl PostHogClient {
body
));
}
Ok(body)
}
/// Fetch the feature flag specs from the server.
///
/// This is unfortunately an undocumented API at:
/// - <https://posthog.com/docs/api/feature-flags#get-api-projects-project_id-feature_flags-local_evaluation>
/// - <https://posthog.com/docs/feature-flags/local-evaluation>
///
/// The handling logic in [`FeatureStore`] mostly follows the Python API implementation.
/// See `_compute_flag_locally` in <https://github.com/PostHog/posthog-python/blob/master/posthog/client.py>
pub async fn get_feature_flags_local_evaluation(
&self,
) -> Result<LocalEvaluationResponse, anyhow::Error> {
let raw = self.get_feature_flags_local_evaluation_raw().await?;
Ok(serde_json::from_str(&raw)?)
Ok(serde_json::from_str(&body)?)
}
/// Capture an event. This will only be used to report the feature flag usage back to PostHog, though

View File

@@ -12,9 +12,7 @@ use tokio::net::TcpStream;
use crate::connect::connect;
use crate::connect_raw::{RawConnection, connect_raw};
use crate::connect_tls::connect_tls;
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::tls::{MakeTlsConnect, TlsConnect, TlsStream};
use crate::tls::{MakeTlsConnect, TlsConnect};
use crate::{Client, Connection, Error};
/// TLS configuration.
@@ -240,7 +238,7 @@ impl Config {
connect(tls, self).await
}
pub async fn tls_and_authenticate<S, T>(
pub async fn connect_raw<S, T>(
&self,
stream: S,
tls: T,
@@ -249,19 +247,7 @@ impl Config {
S: AsyncRead + AsyncWrite + Unpin,
T: TlsConnect<S>,
{
let stream = connect_tls(stream, self.ssl_mode, tls).await?;
connect_raw(stream, self).await
}
pub async fn authenticate<S, T>(
&self,
stream: MaybeTlsStream<S, T>,
) -> Result<RawConnection<S, T>, Error>
where
S: AsyncRead + AsyncWrite + Unpin,
T: TlsStream + Unpin,
{
connect_raw(stream, self).await
connect_raw(stream, tls, self).await
}
}

View File

@@ -9,7 +9,6 @@ use crate::codec::BackendMessage;
use crate::config::Host;
use crate::connect_raw::connect_raw;
use crate::connect_socket::connect_socket;
use crate::connect_tls::connect_tls;
use crate::tls::{MakeTlsConnect, TlsConnect};
use crate::{Client, Config, Connection, Error, RawConnection};
@@ -45,14 +44,13 @@ where
T: TlsConnect<TcpStream>,
{
let socket = connect_socket(host_addr, host, port, config.connect_timeout).await?;
let stream = connect_tls(socket, config.ssl_mode, tls).await?;
let RawConnection {
stream,
parameters,
delayed_notice,
process_id,
secret_key,
} = connect_raw(stream, config).await?;
} = connect_raw(socket, tls, config).await?;
let socket_config = SocketConfig {
host_addr,

View File

@@ -16,8 +16,9 @@ use tokio_util::codec::Framed;
use crate::Error;
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
use crate::config::{self, AuthKeys, Config};
use crate::connect_tls::connect_tls;
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::tls::TlsStream;
use crate::tls::{TlsConnect, TlsStream};
pub struct StartupStream<S, T> {
inner: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
@@ -86,13 +87,16 @@ pub struct RawConnection<S, T> {
}
pub async fn connect_raw<S, T>(
stream: MaybeTlsStream<S, T>,
stream: S,
tls: T,
config: &Config,
) -> Result<RawConnection<S, T>, Error>
) -> Result<RawConnection<S, T::Stream>, Error>
where
S: AsyncRead + AsyncWrite + Unpin,
T: TlsStream + Unpin,
T: TlsConnect<S>,
{
let stream = connect_tls(stream, config.ssl_mode, tls).await?;
let mut stream = StartupStream {
inner: Framed::new(stream, PostgresCodec),
buf: BackendMessages::empty(),

View File

@@ -17,6 +17,9 @@ pub const XLOG_BLCKSZ: u32 = 8192;
#[derive(Clone, Copy, Default, Eq, Ord, PartialEq, PartialOrd, Hash)]
pub struct Lsn(pub u64);
/// Size of a Postgres WAL segment. These are always small enough to fit in a u32.
pub type WalSegmentSize = u32;
impl Serialize for Lsn {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
@@ -163,19 +166,19 @@ impl Lsn {
/// Compute the offset into a segment
#[inline]
pub fn segment_offset(self, seg_sz: usize) -> usize {
(self.0 % seg_sz as u64) as usize
pub fn segment_offset(self, seg_sz: WalSegmentSize) -> WalSegmentSize {
(self.0 % seg_sz as u64) as WalSegmentSize
}
/// Compute LSN of the segment start.
#[inline]
pub fn segment_lsn(self, seg_sz: usize) -> Lsn {
pub fn segment_lsn(self, seg_sz: WalSegmentSize) -> Lsn {
Lsn(self.0 - (self.0 % seg_sz as u64))
}
/// Compute the segment number
#[inline]
pub fn segment_number(self, seg_sz: usize) -> u64 {
pub fn segment_number(self, seg_sz: WalSegmentSize) -> u64 {
self.0 / seg_sz as u64
}
@@ -196,7 +199,7 @@ impl Lsn {
/// Compute the block offset of the first byte of this Lsn within this
/// segment
#[inline]
pub fn page_offset_in_segment(self, seg_sz: usize) -> u64 {
pub fn page_offset_in_segment(self, seg_sz: WalSegmentSize) -> u64 {
(self.0 - self.block_offset()) - self.segment_lsn(seg_sz).0
}
@@ -463,7 +466,7 @@ mod tests {
assert_eq!(Lsn(u64::MAX).widening_sub(0u64), i128::from(u64::MAX));
assert_eq!(Lsn(0).widening_sub(u64::MAX), -i128::from(u64::MAX));
let seg_sz: usize = 16 * 1024 * 1024;
let seg_sz: WalSegmentSize = 16 * 1024 * 1024;
assert_eq!(Lsn(0x1000007).segment_offset(seg_sz), 7);
assert_eq!(Lsn(0x1000007).segment_number(seg_sz), 1u64);

View File

@@ -226,9 +226,9 @@ fn decode_interpret_main(bench: &BenchmarkData, shards: &[ShardIdentity]) {
fn decode_interpret(bench: &BenchmarkData, shard: &[ShardIdentity]) -> anyhow::Result<()> {
let mut decoder = WalStreamDecoder::new(bench.meta.start_lsn, bench.meta.pg_version);
let xlogoff: usize = bench.meta.start_lsn.segment_offset(WAL_SEGMENT_SIZE);
let xlogoff = bench.meta.start_lsn.segment_offset(WAL_SEGMENT_SIZE);
for chunk in bench.wal[xlogoff..].chunks(MAX_SEND_SIZE) {
for chunk in bench.wal[xlogoff as usize..].chunks(MAX_SEND_SIZE) {
decoder.feed_bytes(chunk);
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
assert!(lsn.is_aligned());

View File

@@ -844,13 +844,4 @@ impl Client {
.await
.map_err(Error::ReceiveBody)
}
pub async fn update_feature_flag_spec(&self, spec: String) -> Result<()> {
let uri = format!("{}/v1/feature_flag_spec", self.mgmt_api_endpoint);
self.request(Method::POST, uri, spec)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
}

View File

@@ -110,19 +110,6 @@ message GetBaseBackupRequest {
bool replica = 2;
// If true, include relation files in the base backup. Mainly for debugging and tests.
bool full = 3;
// Compression algorithm to use. Base backups send a compressed payload instead of using gRPC
// compression, so that we can cache compressed backups on the server.
BaseBackupCompression compression = 4;
}
// Base backup compression algorithms.
enum BaseBackupCompression {
// Unknown algorithm. Used when clients send an unsupported algorithm.
BASE_BACKUP_COMPRESSION_UNKNOWN = 0;
// No compression.
BASE_BACKUP_COMPRESSION_NONE = 1;
// GZIP compression.
BASE_BACKUP_COMPRESSION_GZIP = 2;
}
// Base backup response chunk, returned as an ordered stream.

View File

@@ -95,6 +95,7 @@ impl Client {
if let Some(compression) = compression {
// TODO: benchmark this (including network latency).
// TODO: consider enabling compression by default.
client = client
.accept_compressed(compression)
.send_compressed(compression);

View File

@@ -191,21 +191,15 @@ pub struct GetBaseBackupRequest {
pub replica: bool,
/// If true, include relation files in the base backup. Mainly for debugging and tests.
pub full: bool,
/// Compression algorithm to use. Base backups send a compressed payload instead of using gRPC
/// compression, so that we can cache compressed backups on the server.
pub compression: BaseBackupCompression,
}
impl TryFrom<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
type Error = ProtocolError;
fn try_from(pb: proto::GetBaseBackupRequest) -> Result<Self, Self::Error> {
Ok(Self {
impl From<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
fn from(pb: proto::GetBaseBackupRequest) -> Self {
Self {
lsn: (pb.lsn != 0).then_some(Lsn(pb.lsn)),
replica: pb.replica,
full: pb.full,
compression: pb.compression.try_into()?,
})
}
}
}
@@ -215,55 +209,10 @@ impl From<GetBaseBackupRequest> for proto::GetBaseBackupRequest {
lsn: request.lsn.unwrap_or_default().0,
replica: request.replica,
full: request.full,
compression: request.compression.into(),
}
}
}
/// Base backup compression algorithm.
#[derive(Clone, Copy, Debug)]
pub enum BaseBackupCompression {
None,
Gzip,
}
impl TryFrom<proto::BaseBackupCompression> for BaseBackupCompression {
type Error = ProtocolError;
fn try_from(pb: proto::BaseBackupCompression) -> Result<Self, Self::Error> {
match pb {
proto::BaseBackupCompression::Unknown => Err(ProtocolError::invalid("compression", pb)),
proto::BaseBackupCompression::None => Ok(Self::None),
proto::BaseBackupCompression::Gzip => Ok(Self::Gzip),
}
}
}
impl TryFrom<i32> for BaseBackupCompression {
type Error = ProtocolError;
fn try_from(compression: i32) -> Result<Self, Self::Error> {
proto::BaseBackupCompression::try_from(compression)
.map_err(|_| ProtocolError::invalid("compression", compression))
.and_then(Self::try_from)
}
}
impl From<BaseBackupCompression> for proto::BaseBackupCompression {
fn from(compression: BaseBackupCompression) -> Self {
match compression {
BaseBackupCompression::None => Self::None,
BaseBackupCompression::Gzip => Self::Gzip,
}
}
}
impl From<BaseBackupCompression> for i32 {
fn from(compression: BaseBackupCompression) -> Self {
proto::BaseBackupCompression::from(compression).into()
}
}
pub type GetBaseBackupResponseChunk = Bytes;
impl TryFrom<proto::GetBaseBackupResponseChunk> for GetBaseBackupResponseChunk {

View File

@@ -317,7 +317,6 @@ impl Client for LibpqClient {
/// A gRPC Pageserver client.
struct GrpcClient {
inner: page_api::Client,
compression: page_api::BaseBackupCompression,
}
impl GrpcClient {
@@ -332,14 +331,10 @@ impl GrpcClient {
ttid.timeline_id,
ShardIndex::unsharded(),
None,
None, // NB: uses payload compression
compression.then_some(tonic::codec::CompressionEncoding::Zstd),
)
.await?;
let compression = match compression {
true => page_api::BaseBackupCompression::Gzip,
false => page_api::BaseBackupCompression::None,
};
Ok(Self { inner, compression })
Ok(Self { inner })
}
}
@@ -353,7 +348,6 @@ impl Client for GrpcClient {
lsn,
replica: false,
full: false,
compression: self.compression,
};
let stream = self.inner.get_base_backup(req).await?;
Ok(Box::pin(StreamReader::new(

View File

@@ -14,7 +14,6 @@ use std::fmt::Write as FmtWrite;
use std::time::{Instant, SystemTime};
use anyhow::{Context, anyhow};
use async_compression::tokio::write::GzipEncoder;
use bytes::{BufMut, Bytes, BytesMut};
use fail::fail_point;
use pageserver_api::key::{Key, rel_block_to_key};
@@ -26,10 +25,11 @@ use postgres_ffi::{
};
use postgres_ffi_types::constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
use postgres_ffi_types::forknum::{INIT_FORKNUM, MAIN_FORKNUM};
use tokio::io::{self, AsyncWrite, AsyncWriteExt as _};
use tokio::io;
use tokio::io::AsyncWrite;
use tokio_tar::{Builder, EntryType, Header};
use tracing::*;
use utils::lsn::Lsn;
use utils::lsn::{Lsn, WalSegmentSize};
use crate::context::RequestContext;
use crate::pgdatadir_mapping::Version;
@@ -97,7 +97,6 @@ impl From<BasebackupError> for tonic::Status {
/// * When working without safekeepers. In this situation it is important to match the lsn
/// we are taking basebackup on with the lsn that is used in pageserver's walreceiver
/// to start the replication.
#[allow(clippy::too_many_arguments)]
pub async fn send_basebackup_tarball<'a, W>(
write: &'a mut W,
timeline: &'a Timeline,
@@ -105,7 +104,6 @@ pub async fn send_basebackup_tarball<'a, W>(
prev_lsn: Option<Lsn>,
full_backup: bool,
replica: bool,
gzip_level: Option<async_compression::Level>,
ctx: &'a RequestContext,
) -> Result<(), BasebackupError>
where
@@ -124,7 +122,7 @@ where
// prev_lsn value; that happens if the timeline was just branched from
// an old LSN and it doesn't have any WAL of its own yet. We will set
// prev_lsn to Lsn(0) if we cannot provide the correct value.
let (backup_prev, lsn) = if let Some(req_lsn) = req_lsn {
let (backup_prev, backup_lsn) = if let Some(req_lsn) = req_lsn {
// Backup was requested at a particular LSN. The caller should've
// already checked that it's a valid LSN.
@@ -145,7 +143,7 @@ where
};
// Consolidate the derived and the provided prev_lsn values
let prev_record_lsn = if let Some(provided_prev_lsn) = prev_lsn {
let prev_lsn = if let Some(provided_prev_lsn) = prev_lsn {
if backup_prev != Lsn(0) && backup_prev != provided_prev_lsn {
return Err(BasebackupError::Server(anyhow!(
"backup_prev {backup_prev} != provided_prev_lsn {provided_prev_lsn}"
@@ -157,55 +155,30 @@ where
};
info!(
"taking basebackup lsn={lsn}, prev_lsn={prev_record_lsn} \
(full_backup={full_backup}, replica={replica}, gzip={gzip_level:?})",
);
let span = info_span!("send_tarball", backup_lsn=%lsn);
let io_concurrency = IoConcurrency::spawn_from_conf(
timeline.conf.get_vectored_concurrent_io,
timeline
.gate
.enter()
.map_err(|_| BasebackupError::Shutdown)?,
"taking basebackup lsn={}, prev_lsn={} (full_backup={}, replica={})",
backup_lsn, prev_lsn, full_backup, replica
);
if let Some(gzip_level) = gzip_level {
let mut encoder = GzipEncoder::with_quality(write, gzip_level);
Basebackup {
ar: Builder::new_non_terminated(&mut encoder),
timeline,
lsn,
prev_record_lsn,
full_backup,
replica,
ctx,
io_concurrency,
}
let basebackup = Basebackup {
ar: Builder::new_non_terminated(write),
timeline,
lsn: backup_lsn,
prev_record_lsn: prev_lsn,
full_backup,
replica,
ctx,
io_concurrency: IoConcurrency::spawn_from_conf(
timeline.conf.get_vectored_concurrent_io,
timeline
.gate
.enter()
.map_err(|_| BasebackupError::Shutdown)?,
),
};
basebackup
.send_tarball()
.instrument(span)
.await?;
encoder
.shutdown()
.await
.map_err(|err| BasebackupError::Client(err, "gzip"))?;
} else {
Basebackup {
ar: Builder::new_non_terminated(write),
timeline,
lsn,
prev_record_lsn,
full_backup,
replica,
ctx,
io_concurrency,
}
.send_tarball()
.instrument(span)
.await?;
}
Ok(())
.instrument(info_span!("send_tarball", backup_lsn=%backup_lsn))
.await
}
/// This is short-living object only for the time of tarball creation,
@@ -800,7 +773,7 @@ where
self.lsn,
)
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
if wal_seg.len() != WAL_SEGMENT_SIZE {
if WalSegmentSize::try_from(wal_seg.len()) != Ok(WAL_SEGMENT_SIZE) {
return Err(BasebackupError::Server(anyhow!(
"wal_seg.len() != WAL_SEGMENT_SIZE, wal_seg.len()={}",
wal_seg.len()

View File

@@ -1,12 +1,13 @@
use std::{collections::HashMap, sync::Arc};
use anyhow::Context;
use async_compression::tokio::write::GzipEncoder;
use camino::{Utf8Path, Utf8PathBuf};
use metrics::core::{AtomicU64, GenericCounter};
use pageserver_api::{config::BasebackupCacheConfig, models::TenantState};
use tokio::{
io::{AsyncWriteExt, BufWriter},
sync::mpsc::{Receiver, Sender, error::TrySendError},
sync::mpsc::{UnboundedReceiver, UnboundedSender},
};
use tokio_util::sync::CancellationToken;
use utils::{
@@ -19,8 +20,8 @@ use crate::{
basebackup::send_basebackup_tarball,
context::{DownloadBehavior, RequestContext},
metrics::{
BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE,
BASEBACKUP_CACHE_READ, BASEBACKUP_CACHE_SIZE,
BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ,
BASEBACKUP_CACHE_SIZE,
},
task_mgr::TaskKind,
tenant::{
@@ -35,8 +36,8 @@ pub struct BasebackupPrepareRequest {
pub lsn: Lsn,
}
pub type BasebackupPrepareSender = Sender<BasebackupPrepareRequest>;
pub type BasebackupPrepareReceiver = Receiver<BasebackupPrepareRequest>;
pub type BasebackupPrepareSender = UnboundedSender<BasebackupPrepareRequest>;
pub type BasebackupPrepareReceiver = UnboundedReceiver<BasebackupPrepareRequest>;
#[derive(Clone)]
struct CacheEntry {
@@ -60,65 +61,40 @@ struct CacheEntry {
/// and ~1 RPS for get requests.
pub struct BasebackupCache {
data_dir: Utf8PathBuf,
config: Option<BasebackupCacheConfig>,
entries: std::sync::Mutex<HashMap<TenantTimelineId, CacheEntry>>,
prepare_sender: BasebackupPrepareSender,
read_hit_count: GenericCounter<AtomicU64>,
read_miss_count: GenericCounter<AtomicU64>,
read_err_count: GenericCounter<AtomicU64>,
prepare_skip_count: GenericCounter<AtomicU64>,
}
impl BasebackupCache {
/// Create a new BasebackupCache instance.
/// Also returns a BasebackupPrepareReceiver which is needed to start
/// the background task.
/// The cache is initialized from the data_dir in the background task.
/// The cache will return `None` for any get requests until the initialization is complete.
/// The background task is spawned separately using [`Self::spawn_background_task`]
/// to avoid a circular dependency between the cache and the tenant manager.
pub fn new(
/// Creates a BasebackupCache and spawns the background task.
/// The initialization of the cache is performed in the background and does not
/// block the caller. The cache will return `None` for any get requests until
/// initialization is complete.
pub fn spawn(
runtime_handle: &tokio::runtime::Handle,
data_dir: Utf8PathBuf,
config: Option<BasebackupCacheConfig>,
) -> (Arc<Self>, BasebackupPrepareReceiver) {
let chan_size = config.as_ref().map(|c| c.max_size_entries).unwrap_or(1);
let (prepare_sender, prepare_receiver) = tokio::sync::mpsc::channel(chan_size);
prepare_receiver: BasebackupPrepareReceiver,
tenant_manager: Arc<TenantManager>,
cancel: CancellationToken,
) -> Arc<Self> {
let cache = Arc::new(BasebackupCache {
data_dir,
config,
entries: std::sync::Mutex::new(HashMap::new()),
prepare_sender,
read_hit_count: BASEBACKUP_CACHE_READ.with_label_values(&["hit"]),
read_miss_count: BASEBACKUP_CACHE_READ.with_label_values(&["miss"]),
read_err_count: BASEBACKUP_CACHE_READ.with_label_values(&["error"]),
prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
});
(cache, prepare_receiver)
}
/// Spawns the background task.
/// The background task initializes the cache from the disk,
/// processes prepare requests, and cleans up outdated cache entries.
/// Noop if the cache is disabled (config is None).
pub fn spawn_background_task(
self: Arc<Self>,
runtime_handle: &tokio::runtime::Handle,
prepare_receiver: BasebackupPrepareReceiver,
tenant_manager: Arc<TenantManager>,
cancel: CancellationToken,
) {
if let Some(config) = self.config.clone() {
if let Some(config) = config {
let background = BackgroundTask {
c: self,
c: cache.clone(),
config,
tenant_manager,
@@ -133,45 +109,8 @@ impl BasebackupCache {
};
runtime_handle.spawn(background.run(prepare_receiver));
}
}
/// Send a basebackup prepare request to the background task.
/// The basebackup will be prepared asynchronously, it does not block the caller.
/// The request will be skipped if any cache limits are exceeded.
pub fn send_prepare(&self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, lsn: Lsn) {
let req = BasebackupPrepareRequest {
tenant_shard_id,
timeline_id,
lsn,
};
BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE.inc();
let res = self.prepare_sender.try_send(req);
if let Err(e) = res {
BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE.dec();
self.prepare_skip_count.inc();
match e {
TrySendError::Full(_) => {
// Basebackup prepares are pretty rare, normally we should not hit this.
tracing::info!(
tenant_id = %tenant_shard_id.tenant_id,
%timeline_id,
%lsn,
"Basebackup prepare channel is full, skipping the request"
);
}
TrySendError::Closed(_) => {
// Normal during shutdown, not critical.
tracing::info!(
tenant_id = %tenant_shard_id.tenant_id,
%timeline_id,
%lsn,
"Basebackup prepare channel is closed, skipping the request"
);
}
}
}
cache
}
/// Gets a basebackup entry from the cache.
@@ -184,10 +123,6 @@ impl BasebackupCache {
timeline_id: TimelineId,
lsn: Lsn,
) -> Option<tokio::fs::File> {
if !self.is_enabled() {
return None;
}
// Fast path. Check if the entry exists using the in-memory state.
let tti = TenantTimelineId::new(tenant_id, timeline_id);
if self.entries.lock().unwrap().get(&tti).map(|e| e.lsn) != Some(lsn) {
@@ -215,10 +150,6 @@ impl BasebackupCache {
}
}
pub fn is_enabled(&self) -> bool {
self.config.is_some()
}
// Private methods.
fn entry_filename(tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> String {
@@ -436,7 +367,6 @@ impl BackgroundTask {
loop {
tokio::select! {
Some(req) = prepare_receiver.recv() => {
BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE.dec();
if let Err(err) = self.prepare_basebackup(
req.tenant_shard_id,
req.timeline_id,
@@ -664,6 +594,13 @@ impl BackgroundTask {
let file = tokio::fs::File::create(entry_tmp_path).await?;
let mut writer = BufWriter::new(file);
let mut encoder = GzipEncoder::with_quality(
&mut writer,
// Level::Best because compression is not on the hot path of basebackup requests.
// The decompression is almost not affected by the compression level.
async_compression::Level::Best,
);
// We may receive a request before the WAL record is applied to the timeline.
// Wait for the requested LSN to be applied.
timeline
@@ -676,19 +613,17 @@ impl BackgroundTask {
.await?;
send_basebackup_tarball(
&mut writer,
&mut encoder,
timeline,
Some(req_lsn),
None,
false,
false,
// Level::Best because compression is not on the hot path of basebackup requests.
// The decompression is almost not affected by the compression level.
Some(async_compression::Level::Best),
&ctx,
)
.await?;
encoder.shutdown().await?;
writer.flush().await?;
writer.into_inner().sync_all().await?;

View File

@@ -9,7 +9,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, anyhow, bail};
use anyhow::{Context, anyhow};
use camino::Utf8Path;
use clap::{Arg, ArgAction, Command};
use http_utils::tls_certs::ReloadingCertificateResolver;
@@ -102,19 +102,6 @@ fn main() -> anyhow::Result<()> {
let (conf, ignored) = initialize_config(&identity_file_path, &cfg_file_path, &workdir)?;
if !conf.dev_mode {
if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type].contains(&AuthType::Trust)
{
bail!(
"Pageserver refuses to start with HTTP, PostgreSQL or GRPC API authentication disabled.\n\
Set dev_mode = true in pageserver.toml to allow running without authentication.\n\
This is insecure and should only be used in development environments."
);
}
} else {
warn!("Starting in dev mode: this may be an insecure configuration.");
}
// Initialize logging.
//
// It must be initialized before the custom panic hook is installed below.
@@ -582,10 +569,8 @@ fn start_pageserver(
pageserver::l0_flush::L0FlushGlobalState::new(conf.l0_flush.clone());
// Scan the local 'tenants/' directory and start loading the tenants
let (basebackup_cache, basebackup_prepare_receiver) = BasebackupCache::new(
conf.basebackup_cache_dir(),
conf.basebackup_cache_config.clone(),
);
let (basebackup_prepare_sender, basebackup_prepare_receiver) =
tokio::sync::mpsc::unbounded_channel();
let deletion_queue_client = deletion_queue.new_client();
let background_purges = mgr::BackgroundPurges::default();
@@ -597,7 +582,7 @@ fn start_pageserver(
remote_storage: remote_storage.clone(),
deletion_queue_client,
l0_flush_global_state,
basebackup_cache: Arc::clone(&basebackup_cache),
basebackup_prepare_sender,
feature_resolver: feature_resolver.clone(),
},
shutdown_pageserver.clone(),
@@ -605,8 +590,10 @@ fn start_pageserver(
let tenant_manager = Arc::new(tenant_manager);
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(tenant_manager.clone(), order))?;
basebackup_cache.spawn_background_task(
let basebackup_cache = BasebackupCache::spawn(
BACKGROUND_RUNTIME.handle(),
conf.basebackup_cache_dir(),
conf.basebackup_cache_config.clone(),
basebackup_prepare_receiver,
Arc::clone(&tenant_manager),
shutdown_pageserver.child_token(),
@@ -819,6 +806,7 @@ fn start_pageserver(
} else {
None
},
basebackup_cache,
);
// Spawn a Pageserver gRPC server task. It will spawn separate tasks for

View File

@@ -762,23 +762,4 @@ mod tests {
let result = PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir);
assert_eq!(result.is_ok(), is_valid);
}
#[test]
fn test_config_posthog_config_is_valid() {
let input = r#"
control_plane_api = "http://localhost:6666"
[posthog_config]
server_api_key = "phs_AAA"
client_api_key = "phc_BBB"
project_id = "000"
private_api_url = "https://us.posthog.com"
public_api_url = "https://us.i.posthog.com"
"#;
let config_toml = toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(input)
.expect("posthogconfig is valid");
let workdir = Utf8PathBuf::from("/nonexistent");
PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir)
.expect("parse_and_validate");
}
}

View File

@@ -31,13 +31,6 @@ impl FeatureResolver {
}
}
pub fn update(&self, spec: String) -> anyhow::Result<()> {
if let Some(inner) = &self.inner {
inner.update(spec)?;
}
Ok(())
}
pub fn spawn(
conf: &PageServerConf,
shutdown_pageserver: CancellationToken,

View File

@@ -3743,20 +3743,6 @@ async fn force_override_feature_flag_for_testing_delete(
json_response(StatusCode::OK, ())
}
async fn update_feature_flag_spec(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let body = json_request(&mut request).await?;
let state = get_state(&request);
state
.feature_resolver
.update(body)
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
/// Common functionality of all the HTTP API handlers.
///
/// - Adds a tracing span to each request (by `request_span`)
@@ -4142,8 +4128,5 @@ pub fn make_router(
.delete("/v1/feature_flag/:flag_key", |r| {
testing_api_handler("force override feature flag - delete", r, force_override_feature_flag_for_testing_delete)
})
.post("/v1/feature_flag_spec", |r| {
api_handler(r, update_feature_flag_spec)
})
.any(handler_404))
}

View File

@@ -301,7 +301,7 @@ async fn import_wal(
use std::io::Read;
let nread = file.read_to_end(&mut buf)?;
if nread != WAL_SEGMENT_SIZE - offset {
if nread != WAL_SEGMENT_SIZE as usize - offset as usize {
// Maybe allow this for .partial files?
error!("read only {} bytes from WAL file", nread);
}
@@ -455,7 +455,7 @@ pub async fn import_wal_from_tar(
}
};
waldecoder.feed_bytes(&bytes[offset..]);
waldecoder.feed_bytes(&bytes[offset as usize..]);
let mut modification = tline.begin_modification(last_lsn);
while last_lsn <= end_lsn {

View File

@@ -4439,14 +4439,6 @@ pub(crate) static BASEBACKUP_CACHE_SIZE: Lazy<UIntGauge> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_basebackup_cache_prepare_queue_size",
"Number of requests in the basebackup prepare channel"
)
.expect("failed to define a metric")
});
static PAGESERVER_CONFIG_IGNORED_ITEMS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_config_ignored_items",

View File

@@ -13,6 +13,7 @@ use std::time::{Duration, Instant, SystemTime};
use std::{io, str};
use anyhow::{Context as _, anyhow, bail};
use async_compression::tokio::write::GzipEncoder;
use bytes::{Buf as _, BufMut as _, BytesMut};
use futures::future::BoxFuture;
use futures::{FutureExt, Stream};
@@ -62,6 +63,7 @@ use utils::{failpoint_support, span_record};
use crate::auth::check_permission;
use crate::basebackup::{self, BasebackupError};
use crate::basebackup_cache::BasebackupCache;
use crate::config::PageServerConf;
use crate::context::{
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
@@ -136,6 +138,7 @@ pub fn spawn(
perf_trace_dispatch: Option<Dispatch>,
tcp_listener: tokio::net::TcpListener,
tls_config: Option<Arc<rustls::ServerConfig>>,
basebackup_cache: Arc<BasebackupCache>,
) -> Listener {
let cancel = CancellationToken::new();
let libpq_ctx = RequestContext::todo_child(
@@ -157,6 +160,7 @@ pub fn spawn(
conf.pg_auth_type,
tls_config,
conf.page_service_pipelining.clone(),
basebackup_cache,
libpq_ctx,
cancel.clone(),
)
@@ -215,6 +219,7 @@ pub async fn libpq_listener_main(
auth_type: AuthType,
tls_config: Option<Arc<rustls::ServerConfig>>,
pipelining_config: PageServicePipeliningConfig,
basebackup_cache: Arc<BasebackupCache>,
listener_ctx: RequestContext,
listener_cancel: CancellationToken,
) -> Connections {
@@ -258,6 +263,7 @@ pub async fn libpq_listener_main(
auth_type,
tls_config.clone(),
pipelining_config.clone(),
Arc::clone(&basebackup_cache),
connection_ctx,
connections_cancel.child_token(),
gate_guard,
@@ -300,6 +306,7 @@ async fn page_service_conn_main(
auth_type: AuthType,
tls_config: Option<Arc<rustls::ServerConfig>>,
pipelining_config: PageServicePipeliningConfig,
basebackup_cache: Arc<BasebackupCache>,
connection_ctx: RequestContext,
cancel: CancellationToken,
gate_guard: GateGuard,
@@ -365,6 +372,7 @@ async fn page_service_conn_main(
pipelining_config,
conf.get_vectored_concurrent_io,
perf_span_fields,
basebackup_cache,
connection_ctx,
cancel.clone(),
gate_guard,
@@ -418,6 +426,8 @@ struct PageServerHandler {
pipelining_config: PageServicePipeliningConfig,
get_vectored_concurrent_io: GetVectoredConcurrentIo,
basebackup_cache: Arc<BasebackupCache>,
gate_guard: GateGuard,
}
@@ -903,6 +913,7 @@ impl PageServerHandler {
pipelining_config: PageServicePipeliningConfig,
get_vectored_concurrent_io: GetVectoredConcurrentIo,
perf_span_fields: ConnectionPerfSpanFields,
basebackup_cache: Arc<BasebackupCache>,
connection_ctx: RequestContext,
cancel: CancellationToken,
gate_guard: GateGuard,
@@ -916,6 +927,7 @@ impl PageServerHandler {
cancel,
pipelining_config,
get_vectored_concurrent_io,
basebackup_cache,
gate_guard,
}
}
@@ -2601,7 +2613,6 @@ impl PageServerHandler {
prev_lsn,
full_backup,
replica,
None,
&ctx,
)
.await?;
@@ -2615,7 +2626,9 @@ impl PageServerHandler {
&& lsn.is_some()
&& prev_lsn.is_none()
{
timeline.get_cached_basebackup(lsn.unwrap()).await
self.basebackup_cache
.get(tenant_id, timeline_id, lsn.unwrap())
.await
} else {
None
}
@@ -2628,6 +2641,31 @@ impl PageServerHandler {
.map_err(|err| {
BasebackupError::Client(err, "handle_basebackup_request,cached,copy")
})?;
} else if gzip {
let mut encoder = GzipEncoder::with_quality(
&mut writer,
// NOTE using fast compression because it's on the critical path
// for compute startup. For an empty database, we get
// <100KB with this method. The Level::Best compression method
// gives us <20KB, but maybe we should add basebackup caching
// on compute shutdown first.
async_compression::Level::Fastest,
);
basebackup::send_basebackup_tarball(
&mut encoder,
&timeline,
lsn,
prev_lsn,
full_backup,
replica,
&ctx,
)
.await?;
// shutdown the encoder to ensure the gzip footer is written
encoder
.shutdown()
.await
.map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
} else {
basebackup::send_basebackup_tarball(
&mut writer,
@@ -2636,11 +2674,6 @@ impl PageServerHandler {
prev_lsn,
full_backup,
replica,
// NB: using fast compression because it's on the critical path for compute
// startup. For an empty database, we get <100KB with this method. The
// Level::Best compression method gives us <20KB, but maybe we should add
// basebackup caching on compute shutdown first.
gzip.then_some(async_compression::Level::Fastest),
&ctx,
)
.await?;
@@ -3520,7 +3553,7 @@ impl proto::PageService for GrpcPageServiceHandler {
if timeline.is_archived() == Some(true) {
return Err(tonic::Status::failed_precondition("timeline is archived"));
}
let req: page_api::GetBaseBackupRequest = req.into_inner().try_into()?;
let req: page_api::GetBaseBackupRequest = req.into_inner().into();
span_record!(lsn=?req.lsn);
@@ -3546,15 +3579,6 @@ impl proto::PageService for GrpcPageServiceHandler {
let span = Span::current();
let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE);
let jh = tokio::spawn(async move {
let gzip_level = match req.compression {
page_api::BaseBackupCompression::None => None,
// NB: using fast compression because it's on the critical path for compute
// startup. For an empty database, we get <100KB with this method. The
// Level::Best compression method gives us <20KB, but maybe we should add
// basebackup caching on compute shutdown first.
page_api::BaseBackupCompression::Gzip => Some(async_compression::Level::Fastest),
};
let result = basebackup::send_basebackup_tarball(
&mut simplex_write,
&timeline,
@@ -3562,7 +3586,6 @@ impl proto::PageService for GrpcPageServiceHandler {
None,
req.full,
req.replica,
gzip_level,
&ctx,
)
.instrument(span) // propagate request span

View File

@@ -80,7 +80,7 @@ use self::timeline::uninit::{TimelineCreateGuard, TimelineExclusionError, Uninit
use self::timeline::{
EvictionTaskTenantState, GcCutoffs, TimelineDeleteProgress, TimelineResources, WaitLsnError,
};
use crate::basebackup_cache::BasebackupCache;
use crate::basebackup_cache::BasebackupPrepareSender;
use crate::config::PageServerConf;
use crate::context;
use crate::context::RequestContextBuilder;
@@ -162,7 +162,7 @@ pub struct TenantSharedResources {
pub remote_storage: GenericRemoteStorage,
pub deletion_queue_client: DeletionQueueClient,
pub l0_flush_global_state: L0FlushGlobalState,
pub basebackup_cache: Arc<BasebackupCache>,
pub basebackup_prepare_sender: BasebackupPrepareSender,
pub feature_resolver: FeatureResolver,
}
@@ -331,7 +331,7 @@ pub struct TenantShard {
deletion_queue_client: DeletionQueueClient,
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
basebackup_cache: Arc<BasebackupCache>,
basebackup_prepare_sender: BasebackupPrepareSender,
/// Cached logical sizes updated updated on each [`TenantShard::gather_size_inputs`].
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
@@ -1363,7 +1363,7 @@ impl TenantShard {
remote_storage,
deletion_queue_client,
l0_flush_global_state,
basebackup_cache,
basebackup_prepare_sender,
feature_resolver,
} = resources;
@@ -1380,7 +1380,7 @@ impl TenantShard {
remote_storage.clone(),
deletion_queue_client,
l0_flush_global_state,
basebackup_cache,
basebackup_prepare_sender,
feature_resolver,
));
@@ -4380,7 +4380,7 @@ impl TenantShard {
remote_storage: GenericRemoteStorage,
deletion_queue_client: DeletionQueueClient,
l0_flush_global_state: L0FlushGlobalState,
basebackup_cache: Arc<BasebackupCache>,
basebackup_prepare_sender: BasebackupPrepareSender,
feature_resolver: FeatureResolver,
) -> TenantShard {
assert!(!attached_conf.location.generation.is_none());
@@ -4485,7 +4485,7 @@ impl TenantShard {
ongoing_timeline_detach: std::sync::Mutex::default(),
gc_block: Default::default(),
l0_flush_global_state,
basebackup_cache,
basebackup_prepare_sender,
feature_resolver,
}
}
@@ -5414,7 +5414,7 @@ impl TenantShard {
pagestream_throttle_metrics: self.pagestream_throttle_metrics.clone(),
l0_compaction_trigger: self.l0_compaction_trigger.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
basebackup_cache: self.basebackup_cache.clone(),
basebackup_prepare_sender: self.basebackup_prepare_sender.clone(),
feature_resolver: self.feature_resolver.clone(),
}
}
@@ -6000,7 +6000,7 @@ pub(crate) mod harness {
) -> anyhow::Result<Arc<TenantShard>> {
let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
let (basebackup_cache, _) = BasebackupCache::new(Utf8PathBuf::new(), None);
let (basebackup_requst_sender, _) = tokio::sync::mpsc::unbounded_channel();
let tenant = Arc::new(TenantShard::new(
TenantState::Attaching,
@@ -6018,7 +6018,7 @@ pub(crate) mod harness {
self.deletion_queue.new_client(),
// TODO: ideally we should run all unit tests with both configs
L0FlushGlobalState::new(L0FlushConfig::default()),
basebackup_cache,
basebackup_requst_sender,
FeatureResolver::new_disabled(),
));

View File

@@ -2891,18 +2891,14 @@ mod tests {
use std::collections::BTreeMap;
use std::sync::Arc;
use camino::Utf8PathBuf;
use storage_broker::BrokerClientChannel;
use tracing::Instrument;
use super::super::harness::TenantHarness;
use super::TenantsMap;
use crate::{
basebackup_cache::BasebackupCache,
tenant::{
TenantSharedResources,
mgr::{BackgroundPurges, TenantManager, TenantSlot},
},
use crate::tenant::{
TenantSharedResources,
mgr::{BackgroundPurges, TenantManager, TenantSlot},
};
#[tokio::test(start_paused = true)]
@@ -2928,7 +2924,9 @@ mod tests {
// Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
// permit it to proceed: that will stick the tenant in InProgress
let (basebackup_cache, _) = BasebackupCache::new(Utf8PathBuf::new(), None);
let (basebackup_prepare_sender, _) = tokio::sync::mpsc::unbounded_channel::<
crate::basebackup_cache::BasebackupPrepareRequest,
>();
let tenant_manager = TenantManager {
tenants: std::sync::RwLock::new(TenantsMap::Open(tenants)),
@@ -2942,7 +2940,7 @@ mod tests {
l0_flush_global_state: crate::l0_flush::L0FlushGlobalState::new(
h.conf.l0_flush.clone(),
),
basebackup_cache,
basebackup_prepare_sender,
feature_resolver: crate::feature_resolver::FeatureResolver::new_disabled(),
},
cancel: tokio_util::sync::CancellationToken::new(),

View File

@@ -95,12 +95,12 @@ use super::storage_layer::{LayerFringe, LayerVisibilityHint, ReadableLayer};
use super::tasks::log_compaction_error;
use super::upload_queue::NotInitialized;
use super::{
AttachedTenantConf, GcError, HeatMapTimeline, MaybeOffloaded,
AttachedTenantConf, BasebackupPrepareSender, GcError, HeatMapTimeline, MaybeOffloaded,
debug_assert_current_span_has_tenant_and_timeline_id,
};
use crate::PERF_TRACE_TARGET;
use crate::aux_file::AuxFileSizeEstimator;
use crate::basebackup_cache::BasebackupCache;
use crate::basebackup_cache::BasebackupPrepareRequest;
use crate::config::PageServerConf;
use crate::context::{
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
@@ -201,7 +201,7 @@ pub struct TimelineResources {
pub pagestream_throttle_metrics: Arc<crate::metrics::tenant_throttling::Pagestream>,
pub l0_compaction_trigger: Arc<Notify>,
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
pub basebackup_cache: Arc<BasebackupCache>,
pub basebackup_prepare_sender: BasebackupPrepareSender,
pub feature_resolver: FeatureResolver,
}
@@ -448,7 +448,7 @@ pub struct Timeline {
wait_lsn_log_slow: tokio::sync::Semaphore,
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
basebackup_cache: Arc<BasebackupCache>,
basebackup_prepare_sender: BasebackupPrepareSender,
feature_resolver: FeatureResolver,
}
@@ -2500,13 +2500,6 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.basebackup_cache_enabled)
}
/// Try to get a basebackup from the on-disk cache.
pub(crate) async fn get_cached_basebackup(&self, lsn: Lsn) -> Option<tokio::fs::File> {
self.basebackup_cache
.get(self.tenant_shard_id.tenant_id, self.timeline_id, lsn)
.await
}
/// Prepare basebackup for the given LSN and store it in the basebackup cache.
/// The method is asynchronous and returns immediately.
/// The actual basebackup preparation is performed in the background
@@ -2528,8 +2521,17 @@ impl Timeline {
return;
}
self.basebackup_cache
.send_prepare(self.tenant_shard_id, self.timeline_id, lsn);
let res = self
.basebackup_prepare_sender
.send(BasebackupPrepareRequest {
tenant_shard_id: self.tenant_shard_id,
timeline_id: self.timeline_id,
lsn,
});
if let Err(e) = res {
// May happen during shutdown, it's not critical.
info!("Failed to send shutdown checkpoint: {e:#}");
}
}
}
@@ -3086,7 +3088,7 @@ impl Timeline {
wait_lsn_log_slow: tokio::sync::Semaphore::new(1),
basebackup_cache: resources.basebackup_cache,
basebackup_prepare_sender: resources.basebackup_prepare_sender,
feature_resolver: resources.feature_resolver,
};

View File

@@ -275,20 +275,12 @@ pub(super) async fn handle_walreceiver_connection(
let copy_stream = replication_client.copy_both_simple(&query).await?;
let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
let walingest_future = WalIngest::new(timeline.as_ref(), startpoint, &ctx);
let walingest_res = select! {
walingest_res = walingest_future => walingest_res,
_ = cancellation.cancelled() => {
// We are doing reads in WalIngest::new, and those can hang as they come from the network.
// Timeline cancellation hits the walreceiver cancellation token before it hits the timeline global one.
debug!("Connection cancelled");
return Err(WalReceiverError::Cancelled);
},
};
let mut walingest = walingest_res.map_err(|e| match e.kind {
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
_ => WalReceiverError::Other(e.into()),
})?;
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx)
.await
.map_err(|e| match e.kind {
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
_ => WalReceiverError::Other(e.into()),
})?;
let (format, compression) = match protocol {
PostgresClientProtocol::Interpreted {

View File

@@ -2383,17 +2383,17 @@ mod tests {
let started_at = std::time::Instant::now();
// Initialize walingest
let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
let xlogoff = startpoint.segment_offset(WAL_SEGMENT_SIZE);
let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
.await
.unwrap();
let mut modification = tline.begin_modification(startpoint);
println!("decoding {} bytes", bytes.len() - xlogoff);
println!("decoding {} bytes", bytes.len() - xlogoff as usize);
// Decode and ingest wal. We process the wal in chunks because
// that's what happens when we get bytes from safekeepers.
for chunk in bytes[xlogoff..].chunks(50) {
for chunk in bytes[xlogoff as usize..].chunks(50) {
decoder.feed_bytes(chunk);
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
let interpreted = InterpretedWalRecord::from_bytes_filtered(

View File

@@ -1295,8 +1295,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (iteration_hits != 0)
{
/* chunk offset (#
of pages) into the LFC file */
/* chunk offset (# of pages) into the LFC file */
off_t first_read_offset = (off_t) entry_offset * lfc_blocks_per_chunk;
int nwrite = iov_last_used - first_block_in_chunk_read;
/* offset of first IOV */
@@ -1314,6 +1313,16 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
lfc_disable("read");
return -1;
}
/*
* We successfully read the pages we know were valid when we
* started reading; now mark those pages as read
*/
for (int i = first_block_in_chunk_read; i < iov_last_used; i++)
{
if (BITMAP_ISSET(chunk_mask, i))
BITMAP_SET(mask, buf_offset + i);
}
}
/* Place entry to the head of LRU list */
@@ -1331,15 +1340,6 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
{
lfc_ctl->time_read += io_time_us;
inc_page_cache_read_wait(io_time_us);
/*
* We successfully read the pages we know were valid when we
* started reading; now mark those pages as read
*/
for (int i = first_block_in_chunk_read; i < iov_last_used; i++)
{
if (BITMAP_ISSET(chunk_mask, i))
BITMAP_SET(mask, buf_offset + i);
}
}
CriticalAssert(entry->access_count > 0);

View File

@@ -6,6 +6,7 @@ use std::collections::BTreeMap;
use std::pin::pin;
use std::sync::Mutex;
use futures::future::Either;
use scopeguard::ScopeGuard;
use tokio::sync::oneshot::error::TryRecvError;
@@ -48,67 +49,37 @@ impl<P: QueueProcessing> BatchQueue<P> {
}
}
/// Perform a single request-response process, this may be batched internally.
///
/// This function is not cancel safe.
pub async fn call<R>(
&self,
req: P::Req,
cancelled: impl Future<Output = R>,
) -> Result<P::Res, R> {
pub async fn call(&self, req: P::Req) -> P::Res {
let (id, mut rx) = self.inner.lock_propagate_poison().register_job(req);
let guard = scopeguard::guard(id, move |id| {
let mut inner = self.inner.lock_propagate_poison();
if inner.queue.remove(&id).is_some() {
tracing::debug!("batched task cancelled before completion");
}
});
let mut cancelled = pin!(cancelled);
let resp = loop {
// try become the leader, or try wait for success.
let mut processor = tokio::select! {
// try become leader.
p = self.processor.lock() => p,
// wait for success.
resp = &mut rx => break resp.ok(),
// wait for cancellation.
cancel = cancelled.as_mut() => {
let mut inner = self.inner.lock_propagate_poison();
if inner.queue.remove(&id).is_some() {
tracing::warn!("batched task cancelled before completion");
}
return Err(cancel);
},
let mut processor = match futures::future::select(rx, pin!(self.processor.lock())).await
{
// we got the resp.
Either::Left((resp, _)) => break resp.ok(),
// we are the leader.
Either::Right((p, rx_)) => {
rx = rx_;
p
}
};
tracing::debug!(id, "batch: became leader");
let (reqs, resps) = self.inner.lock_propagate_poison().get_batch(&processor);
// snitch incase the task gets cancelled.
let cancel_safety = scopeguard::guard((), |()| {
if !std::thread::panicking() {
tracing::error!(
id,
"batch: leader cancelled, despite not being cancellation safe"
);
}
});
// apply a batch.
// if this is cancelled, jobs will not be completed and will panic.
let values = processor.apply(reqs).await;
// good: we didn't get cancelled.
ScopeGuard::into_inner(cancel_safety);
if values.len() != resps.len() {
tracing::error!(
"batch: invalid response size, expected={}, got={}",
resps.len(),
values.len()
);
}
// send response values.
for (tx, value) in std::iter::zip(resps, values) {
if tx.send(value).is_err() {
// receiver hung up but that's fine.
}
// sender hung up but that's fine.
drop(tx.send(value));
}
match rx.try_recv() {
@@ -127,9 +98,10 @@ impl<P: QueueProcessing> BatchQueue<P> {
}
};
tracing::debug!(id, "batch: job completed");
// already removed.
ScopeGuard::into_inner(guard);
Ok(resp.expect("no response found. batch processer should not panic"))
resp.expect("no response found. batch processer should not panic")
}
}
@@ -153,8 +125,6 @@ impl<P: QueueProcessing> BatchQueueInner<P> {
self.queue.insert(id, BatchJob { req, res: tx });
tracing::debug!(id, "batch: registered job in the queue");
(id, rx)
}
@@ -162,19 +132,15 @@ impl<P: QueueProcessing> BatchQueueInner<P> {
let batch_size = p.batch_size(self.queue.len());
let mut reqs = Vec::with_capacity(batch_size);
let mut resps = Vec::with_capacity(batch_size);
let mut ids = Vec::with_capacity(batch_size);
while reqs.len() < batch_size {
let Some((id, job)) = self.queue.pop_first() else {
let Some((_, job)) = self.queue.pop_first() else {
break;
};
reqs.push(job.req);
resps.push(job.res);
ids.push(id);
}
tracing::debug!(ids=?ids, "batch: acquired jobs");
(reqs, resps)
}
}

View File

@@ -279,6 +279,7 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
},
proxy_protocol_v2: config::ProxyProtocolV2::Rejected,
handshake_timeout: Duration::from_secs(10),
region: "local".into(),
wake_compute_retry_config: RetryConfig::parse(RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)?,
connect_compute_locks,
connect_to_compute: compute_config,

View File

@@ -236,6 +236,7 @@ pub(super) async fn task_main(
extra: None,
},
crate::metrics::Protocol::SniRouter,
"sni",
);
handle_client(ctx, dest_suffix, tls_config, compute_tls_config, socket).await
}

View File

@@ -123,6 +123,12 @@ struct ProxyCliArgs {
/// timeout for the TLS handshake
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
handshake_timeout: tokio::time::Duration,
/// http endpoint to receive periodic metric updates
#[clap(long)]
metric_collection_endpoint: Option<String>,
/// how often metrics should be sent to a collection endpoint
#[clap(long)]
metric_collection_interval: Option<String>,
/// cache for `wake_compute` api method (use `size=0` to disable)
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
wake_compute_cache: String,
@@ -149,31 +155,40 @@ struct ProxyCliArgs {
/// Wake compute rate limiter max number of requests per second.
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_SET)]
wake_compute_limit: Vec<RateBucketInfo>,
/// Redis rate limiter max number of requests per second.
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_REDIS_SET)]
redis_rps_limit: Vec<RateBucketInfo>,
/// Cancellation channel size (max queue size for redis kv client)
#[clap(long, default_value_t = 1024)]
cancellation_ch_size: usize,
/// Cancellation ops batch size for redis
#[clap(long, default_value_t = 8)]
cancellation_batch_size: usize,
/// redis url for plain authentication
#[clap(long, alias("redis-notifications"))]
redis_plain: Option<String>,
/// what from the available authentications type to use for redis. Supported are "irsa" and "plain".
/// cache for `allowed_ips` (use `size=0` to disable)
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
allowed_ips_cache: String,
/// cache for `role_secret` (use `size=0` to disable)
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
role_secret_cache: String,
/// redis url for notifications (if empty, redis_host:port will be used for both notifications and streaming connections)
#[clap(long)]
redis_notifications: Option<String>,
/// what from the available authentications type to use for the regional redis we have. Supported are "irsa" and "plain".
#[clap(long, default_value = "irsa")]
redis_auth_type: String,
/// redis host for irsa authentication
/// redis host for streaming connections (might be different from the notifications host)
#[clap(long)]
redis_host: Option<String>,
/// redis port for irsa authentication
/// redis port for streaming connections (might be different from the notifications host)
#[clap(long)]
redis_port: Option<u16>,
/// redis cluster name for irsa authentication
/// redis cluster name, used in aws elasticache
#[clap(long)]
redis_cluster_name: Option<String>,
/// redis user_id for irsa authentication
/// redis user_id, used in aws elasticache
#[clap(long)]
redis_user_id: Option<String>,
/// aws region for irsa authentication
/// aws region to retrieve credentials
#[clap(long, default_value_t = String::new())]
aws_region: String,
/// cache for `project_info` (use `size=0` to disable)
@@ -185,12 +200,6 @@ struct ProxyCliArgs {
#[clap(flatten)]
parquet_upload: ParquetUploadArgs,
/// http endpoint to receive periodic metric updates
#[clap(long)]
metric_collection_endpoint: Option<String>,
/// how often metrics should be sent to a collection endpoint
#[clap(long)]
metric_collection_interval: Option<String>,
/// interval for backup metric collection
#[clap(long, default_value = "10m", value_parser = humantime::parse_duration)]
metric_backup_collection_interval: std::time::Duration,
@@ -203,7 +212,6 @@ struct ProxyCliArgs {
/// Size of each event is no more than 400 bytes, so 2**22 is about 200MB before the compression.
#[clap(long, default_value = "4194304")]
metric_backup_collection_chunk_size: usize,
/// Whether to retry the connection to the compute node
#[clap(long, default_value = config::RetryConfig::CONNECT_TO_COMPUTE_DEFAULT_VALUES)]
connect_to_compute_retry: String,
@@ -323,7 +331,7 @@ pub async fn run() -> anyhow::Result<()> {
Either::Right(auth_backend) => info!("Authentication backend: {auth_backend:?}"),
}
info!("Using region: {}", args.aws_region);
let redis_client = configure_redis(&args).await?;
let (regional_redis_client, redis_notifications_client) = configure_redis(&args).await?;
// Check that we can bind to address before further initialization
info!("Starting http on {}", args.http);
@@ -378,6 +386,13 @@ pub async fn run() -> anyhow::Result<()> {
let cancellation_token = CancellationToken::new();
let redis_rps_limit = Vec::leak(args.redis_rps_limit.clone());
RateBucketInfo::validate(redis_rps_limit)?;
let redis_kv_client = regional_redis_client
.as_ref()
.map(|redis_publisher| RedisKVClient::new(redis_publisher.clone(), redis_rps_limit));
let cancellation_handler = Arc::new(CancellationHandler::new(&config.connect_to_compute));
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new_with_shards(
@@ -457,7 +472,6 @@ pub async fn run() -> anyhow::Result<()> {
client_tasks.spawn(crate::context::parquet::worker(
cancellation_token.clone(),
args.parquet_upload,
args.region,
));
// maintenance tasks. these never return unless there's an error
@@ -481,17 +495,32 @@ pub async fn run() -> anyhow::Result<()> {
#[cfg_attr(not(any(test, feature = "testing")), expect(irrefutable_let_patterns))]
if let Either::Left(auth::Backend::ControlPlane(api, ())) = &auth_backend {
if let crate::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api {
if let Some(client) = redis_client {
// project info cache and invalidation of that cache.
let cache = api.caches.project_info.clone();
maintenance_tasks.spawn(notifications::task_main(client.clone(), cache.clone()));
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
match (redis_notifications_client, regional_redis_client.clone()) {
(None, None) => {}
(client1, client2) => {
let cache = api.caches.project_info.clone();
if let Some(client) = client1 {
maintenance_tasks.spawn(notifications::task_main(
client,
cache.clone(),
args.region.clone(),
));
}
if let Some(client) = client2 {
maintenance_tasks.spawn(notifications::task_main(
client,
cache.clone(),
args.region.clone(),
));
}
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
}
}
// Try to connect to Redis 3 times with 1 + (0..0.1) second interval.
// This prevents immediate exit and pod restart,
// which can cause hammering of the redis in case of connection issues.
// cancellation key management
let mut redis_kv_client = RedisKVClient::new(client.clone());
// Try to connect to Redis 3 times with 1 + (0..0.1) second interval.
// This prevents immediate exit and pod restart,
// which can cause hammering of the redis in case of connection issues.
if let Some(mut redis_kv_client) = redis_kv_client {
for attempt in (0..3).with_position() {
match redis_kv_client.try_connect().await {
Ok(()) => {
@@ -516,12 +545,14 @@ pub async fn run() -> anyhow::Result<()> {
}
}
}
}
// listen for notifications of new projects/endpoints/branches
if let Some(regional_redis_client) = regional_redis_client {
let cache = api.caches.endpoints_cache.clone();
let con = regional_redis_client;
let span = tracing::info_span!("endpoints_cache");
maintenance_tasks.spawn(
async move { cache.do_read(client, cancellation_token.clone()).await }
async move { cache.do_read(con, cancellation_token.clone()).await }
.instrument(span),
);
}
@@ -650,6 +681,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
authentication_config,
proxy_protocol_v2: args.proxy_protocol_v2,
handshake_timeout: args.handshake_timeout,
region: args.region.clone(),
wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?,
connect_compute_locks,
connect_to_compute: compute_config,
@@ -811,18 +843,21 @@ fn build_auth_backend(
async fn configure_redis(
args: &ProxyCliArgs,
) -> anyhow::Result<Option<ConnectionWithCredentialsProvider>> {
) -> anyhow::Result<(
Option<ConnectionWithCredentialsProvider>,
Option<ConnectionWithCredentialsProvider>,
)> {
// TODO: untangle the config args
let redis_client = match &*args.redis_auth_type {
"plain" => match &args.redis_plain {
let regional_redis_client = match (args.redis_auth_type.as_str(), &args.redis_notifications) {
("plain", redis_url) => match redis_url {
None => {
bail!("plain auth requires redis_plain to be set");
bail!("plain auth requires redis_notifications to be set");
}
Some(url) => {
Some(ConnectionWithCredentialsProvider::new_with_static_credentials(url.clone()))
}
},
"irsa" => match (&args.redis_host, args.redis_port) {
("irsa", _) => match (&args.redis_host, args.redis_port) {
(Some(host), Some(port)) => Some(
ConnectionWithCredentialsProvider::new_with_credentials_provider(
host.clone(),
@@ -846,12 +881,18 @@ async fn configure_redis(
bail!("redis-host and redis-port must be specified together");
}
},
auth_type => {
bail!("unknown auth type {auth_type:?} given")
_ => {
bail!("unknown auth type given");
}
};
Ok(redis_client)
let redis_notifications_client = if let Some(url) = &args.redis_notifications {
Some(ConnectionWithCredentialsProvider::new_with_static_credentials(&**url))
} else {
regional_redis_client.clone()
};
Ok((regional_redis_client, redis_notifications_client))
}
#[cfg(test)]

View File

@@ -1,6 +1,5 @@
use std::convert::Infallible;
use std::net::{IpAddr, SocketAddr};
use std::pin::pin;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
@@ -99,6 +98,7 @@ impl Pipeline {
impl CancelKeyOp {
fn register(&self, pipe: &mut Pipeline) {
#[allow(clippy::used_underscore_binding)]
match self {
CancelKeyOp::StoreCancelKey { key, value, expire } => {
let key = KeyPrefix::Cancel(*key).build_redis_key();
@@ -224,7 +224,6 @@ impl CancellationHandler {
}
}
/// This is not cancel safe
async fn get_cancel_key(
&self,
key: CancelKeyData,
@@ -241,21 +240,16 @@ impl CancellationHandler {
};
const TIMEOUT: Duration = Duration::from_secs(5);
let result = timeout(
TIMEOUT,
tx.call((guard, op), std::future::pending::<Infallible>()),
)
.await
.map_err(|_| {
tracing::warn!("timed out waiting to receive GetCancelData response");
CancelError::RateLimit
})?
// cannot be cancelled
.unwrap_or_else(|x| match x {})
.map_err(|e| {
tracing::warn!("failed to receive GetCancelData response: {e}");
CancelError::InternalError
})?;
let result = timeout(TIMEOUT, tx.call((guard, op)))
.await
.map_err(|_| {
tracing::warn!("timed out waiting to receive GetCancelData response");
CancelError::RateLimit
})?
.map_err(|e| {
tracing::warn!("failed to receive GetCancelData response: {e}");
CancelError::InternalError
})?;
let cancel_state_str = String::from_owned_redis_value(result).map_err(|e| {
tracing::warn!("failed to receive GetCancelData response: {e}");
@@ -277,8 +271,6 @@ impl CancellationHandler {
/// Will fetch IP allowlist internally.
///
/// return Result primarily for tests
///
/// This is not cancel safe
pub(crate) async fn cancel_session<T: ControlPlaneApi>(
&self,
key: CancelKeyData,
@@ -402,8 +394,6 @@ impl Session {
/// Ensure the cancel key is continously refreshed,
/// but stop when the channel is dropped.
///
/// This is not cancel safe
pub(crate) async fn maintain_cancel_key(
&self,
session_id: uuid::Uuid,
@@ -411,6 +401,27 @@ impl Session {
cancel_closure: &CancelClosure,
compute_config: &ComputeConfig,
) {
futures::future::select(
std::pin::pin!(self.maintain_redis_cancel_key(cancel_closure)),
cancel,
)
.await;
if let Err(err) = cancel_closure
.try_cancel_query(compute_config)
.boxed()
.await
{
tracing::warn!(
?session_id,
?err,
"could not cancel the query in the database"
);
}
}
// Ensure the cancel key is continously refreshed.
async fn maintain_redis_cancel_key(&self, cancel_closure: &CancelClosure) -> ! {
let Some(tx) = self.cancellation_handler.tx.get() else {
tracing::warn!("cancellation handler is not available");
// don't exit, as we only want to exit if cancelled externally.
@@ -421,8 +432,6 @@ impl Session {
.expect("serialising to json string should not fail")
.into_boxed_str();
let mut cancel = pin!(cancel);
loop {
let guard = Metrics::get()
.proxy
@@ -440,35 +449,9 @@ impl Session {
"registering cancellation key"
);
match tx.call((guard, op), cancel.as_mut()).await {
Ok(Ok(_)) => {
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
"registered cancellation key"
);
// wait before continuing.
tokio::time::sleep(CANCEL_KEY_REFRESH).await;
}
// retry immediately.
Ok(Err(error)) => {
tracing::warn!(?error, "error registering cancellation key");
}
Err(Err(_cancelled)) => break,
if tx.call((guard, op)).await.is_ok() {
tokio::time::sleep(CANCEL_KEY_REFRESH).await;
}
}
if let Err(err) = cancel_closure
.try_cancel_query(compute_config)
.boxed()
.await
{
tracing::warn!(
?session_id,
?err,
"could not cancel the query in the database"
);
}
}
}

View File

@@ -6,7 +6,7 @@ use std::net::{IpAddr, SocketAddr};
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use postgres_client::config::{AuthKeys, ChannelBinding, SslMode};
use postgres_client::config::{AuthKeys, SslMode};
use postgres_client::maybe_tls_stream::MaybeTlsStream;
use postgres_client::tls::MakeTlsConnect;
use postgres_client::{NoTls, RawCancelToken, RawConnection};
@@ -129,8 +129,6 @@ pub(crate) struct AuthInfo {
auth: Option<Auth>,
server_params: StartupMessageParams,
channel_binding: ChannelBinding,
/// Console redirect sets user and database, we shouldn't re-use those from the params.
skip_db_user: bool,
}
@@ -154,8 +152,6 @@ impl AuthInfo {
auth: pw.map(|pw| Auth::Password(pw.as_bytes().to_owned())),
server_params,
skip_db_user: true,
// pg-sni-router is a mitm so this would fail.
channel_binding: ChannelBinding::Disable,
}
}
@@ -169,7 +165,6 @@ impl AuthInfo {
},
server_params: StartupMessageParams::default(),
skip_db_user: false,
channel_binding: ChannelBinding::Prefer,
}
}
}
@@ -192,7 +187,6 @@ impl AuthInfo {
Some(Auth::Password(pw)) => config.password(pw),
None => &mut config,
};
config.channel_binding(self.channel_binding);
for (k, v) in self.server_params.iter() {
config.set_param(k, v);
}
@@ -247,9 +241,7 @@ impl AuthInfo {
let tmp_config = self.enrich(tmp_config);
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let connection = tmp_config
.tls_and_authenticate(&mut compute.stream, NoTls)
.await?;
let connection = tmp_config.connect_raw(&mut compute.stream, NoTls).await?;
drop(pause);
let RawConnection {

View File

@@ -22,6 +22,7 @@ pub struct ProxyConfig {
pub http_config: HttpConfig,
pub authentication_config: AuthenticationConfig,
pub proxy_protocol_v2: ProxyProtocolV2,
pub region: String,
pub handshake_timeout: Duration,
pub wake_compute_retry_config: RetryConfig,
pub connect_compute_locks: ApiLocks<Host>,

View File

@@ -89,7 +89,12 @@ pub async fn task_main(
}
}
let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Tcp);
let ctx = RequestContext::new(
session_id,
conn_info,
crate::metrics::Protocol::Tcp,
&config.region,
);
let res = handle_client(
config,

View File

@@ -46,6 +46,7 @@ struct RequestContextInner {
pub(crate) session_id: Uuid,
pub(crate) protocol: Protocol,
first_packet: chrono::DateTime<Utc>,
region: &'static str,
pub(crate) span: Span,
// filled in as they are discovered
@@ -93,6 +94,7 @@ impl Clone for RequestContext {
session_id: inner.session_id,
protocol: inner.protocol,
first_packet: inner.first_packet,
region: inner.region,
span: info_span!("background_task"),
project: inner.project,
@@ -122,7 +124,12 @@ impl Clone for RequestContext {
}
impl RequestContext {
pub fn new(session_id: Uuid, conn_info: ConnectionInfo, protocol: Protocol) -> Self {
pub fn new(
session_id: Uuid,
conn_info: ConnectionInfo,
protocol: Protocol,
region: &'static str,
) -> Self {
// TODO: be careful with long lived spans
let span = info_span!(
"connect_request",
@@ -138,6 +145,7 @@ impl RequestContext {
session_id,
protocol,
first_packet: Utc::now(),
region,
span,
project: None,
@@ -171,7 +179,7 @@ impl RequestContext {
let ip = IpAddr::from([127, 0, 0, 1]);
let addr = SocketAddr::new(ip, 5432);
let conn_info = ConnectionInfo { addr, extra: None };
RequestContext::new(Uuid::now_v7(), conn_info, Protocol::Tcp)
RequestContext::new(Uuid::now_v7(), conn_info, Protocol::Tcp, "test")
}
pub(crate) fn console_application_name(&self) -> String {

View File

@@ -74,7 +74,7 @@ pub(crate) const FAILED_UPLOAD_MAX_RETRIES: u32 = 10;
#[derive(parquet_derive::ParquetRecordWriter)]
pub(crate) struct RequestData {
region: String,
region: &'static str,
protocol: &'static str,
/// Must be UTC. The derive macro doesn't like the timezones
timestamp: chrono::NaiveDateTime,
@@ -147,7 +147,7 @@ impl From<&RequestContextInner> for RequestData {
}),
jwt_issuer: value.jwt_issuer.clone(),
protocol: value.protocol.as_str(),
region: String::new(),
region: value.region,
error: value.error_kind.as_ref().map(|e| e.to_metric_label()),
success: value.success,
cold_start_info: value.cold_start_info.as_str(),
@@ -167,7 +167,6 @@ impl From<&RequestContextInner> for RequestData {
pub async fn worker(
cancellation_token: CancellationToken,
config: ParquetUploadArgs,
region: String,
) -> anyhow::Result<()> {
let Some(remote_storage_config) = config.parquet_upload_remote_storage else {
tracing::warn!("parquet request upload: no s3 bucket configured");
@@ -233,17 +232,12 @@ pub async fn worker(
.context("remote storage for disconnect events init")?;
let parquet_config_disconnect = parquet_config.clone();
tokio::try_join!(
worker_inner(storage, rx, parquet_config, &region),
worker_inner(
storage_disconnect,
rx_disconnect,
parquet_config_disconnect,
&region
)
worker_inner(storage, rx, parquet_config),
worker_inner(storage_disconnect, rx_disconnect, parquet_config_disconnect)
)
.map(|_| ())
} else {
worker_inner(storage, rx, parquet_config, &region).await
worker_inner(storage, rx, parquet_config).await
}
}
@@ -263,7 +257,6 @@ async fn worker_inner(
storage: GenericRemoteStorage,
rx: impl Stream<Item = RequestData>,
config: ParquetConfig,
region: &str,
) -> anyhow::Result<()> {
#[cfg(any(test, feature = "testing"))]
let storage = if config.test_remote_failures > 0 {
@@ -284,8 +277,7 @@ async fn worker_inner(
let mut last_upload = time::Instant::now();
let mut len = 0;
while let Some(mut row) = rx.next().await {
region.clone_into(&mut row.region);
while let Some(row) = rx.next().await {
rows.push(row);
let force = last_upload.elapsed() > config.max_duration;
if rows.len() == config.rows_per_group || force {
@@ -541,7 +533,7 @@ mod tests {
auth_method: None,
jwt_issuer: None,
protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)],
region: String::new(),
region: "us-east-1",
error: None,
success: rng.r#gen(),
cold_start_info: "no",
@@ -573,9 +565,7 @@ mod tests {
.await
.unwrap();
worker_inner(storage, rx, config, "us-east-1")
.await
.unwrap();
worker_inner(storage, rx, config).await.unwrap();
let mut files = WalkDir::new(tmpdir.as_std_path())
.into_iter()

View File

@@ -122,7 +122,12 @@ pub async fn task_main(
}
}
let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Tcp);
let ctx = RequestContext::new(
session_id,
conn_info,
crate::metrics::Protocol::Tcp,
&config.region,
);
let res = handle_client(
config,

View File

@@ -169,7 +169,7 @@ async fn scram_auth_disable_channel_binding() -> anyhow::Result<()> {
.dbname("db")
.password("password")
.ssl_mode(SslMode::Require)
.tls_and_authenticate(server, client_config.make_tls_connect()?)
.connect_raw(server, client_config.make_tls_connect()?)
.await?;
proxy.await?
@@ -252,7 +252,7 @@ async fn connect_failure(
.dbname("db")
.password("password")
.ssl_mode(SslMode::Require)
.tls_and_authenticate(server, client_config.make_tls_connect()?)
.connect_raw(server, client_config.make_tls_connect()?)
.await
.err()
.context("client shouldn't be able to connect")?;

View File

@@ -199,7 +199,7 @@ async fn handshake_tls_is_enforced_by_proxy() -> anyhow::Result<()> {
.user("john_doe")
.dbname("earth")
.ssl_mode(SslMode::Disable)
.tls_and_authenticate(server, NoTls)
.connect_raw(server, NoTls)
.await
.err() // -> Option<E>
.context("client shouldn't be able to connect")?;
@@ -228,7 +228,7 @@ async fn handshake_tls() -> anyhow::Result<()> {
.user("john_doe")
.dbname("earth")
.ssl_mode(SslMode::Require)
.tls_and_authenticate(server, client_config.make_tls_connect()?)
.connect_raw(server, client_config.make_tls_connect()?)
.await?;
proxy.await?
@@ -245,7 +245,7 @@ async fn handshake_raw() -> anyhow::Result<()> {
.dbname("earth")
.set_param("options", "project=generic-project-name")
.ssl_mode(SslMode::Prefer)
.tls_and_authenticate(server, NoTls)
.connect_raw(server, NoTls)
.await?;
proxy.await?
@@ -293,7 +293,7 @@ async fn scram_auth_good(#[case] password: &str) -> anyhow::Result<()> {
.dbname("db")
.password(password)
.ssl_mode(SslMode::Require)
.tls_and_authenticate(server, client_config.make_tls_connect()?)
.connect_raw(server, client_config.make_tls_connect()?)
.await?;
proxy.await?
@@ -317,7 +317,7 @@ async fn scram_auth_disable_channel_binding() -> anyhow::Result<()> {
.dbname("db")
.password("password")
.ssl_mode(SslMode::Require)
.tls_and_authenticate(server, client_config.make_tls_connect()?)
.connect_raw(server, client_config.make_tls_connect()?)
.await?;
proxy.await?
@@ -344,7 +344,7 @@ async fn scram_auth_mock() -> anyhow::Result<()> {
.dbname("db")
.password(&password) // no password will match the mocked secret
.ssl_mode(SslMode::Require)
.tls_and_authenticate(server, client_config.make_tls_connect()?)
.connect_raw(server, client_config.make_tls_connect()?)
.await
.err() // -> Option<E>
.context("client shouldn't be able to connect")?;

View File

@@ -139,6 +139,12 @@ impl RateBucketInfo {
Self::new(200, Duration::from_secs(600)),
];
// For all the sessions will be cancel key. So this limit is essentially global proxy limit.
pub const DEFAULT_REDIS_SET: [Self; 2] = [
Self::new(100_000, Duration::from_secs(1)),
Self::new(50_000, Duration::from_secs(10)),
];
pub fn rps(&self) -> f64 {
(self.max_rpi as f64) / self.interval.as_secs_f64()
}

View File

@@ -23,9 +23,10 @@ impl KeyPrefix {
#[cfg(test)]
mod tests {
use super::*;
use crate::pqproto::id_to_cancel_key;
use super::*;
#[test]
fn test_build_redis_key() {
let cancel_key: KeyPrefix = KeyPrefix::Cancel(id_to_cancel_key(12345 << 32 | 54321));

View File

@@ -5,9 +5,11 @@ use redis::aio::ConnectionLike;
use redis::{Cmd, FromRedisValue, Pipeline, RedisResult};
use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo};
pub struct RedisKVClient {
client: ConnectionWithCredentialsProvider,
limiter: GlobalRateLimiter,
}
#[allow(async_fn_in_trait)]
@@ -28,8 +30,11 @@ impl Queryable for Cmd {
}
impl RedisKVClient {
pub fn new(client: ConnectionWithCredentialsProvider) -> Self {
Self { client }
pub fn new(client: ConnectionWithCredentialsProvider, info: &'static [RateBucketInfo]) -> Self {
Self {
client,
limiter: GlobalRateLimiter::new(info.into()),
}
}
pub async fn try_connect(&mut self) -> anyhow::Result<()> {
@@ -44,6 +49,11 @@ impl RedisKVClient {
&mut self,
q: &impl Queryable,
) -> anyhow::Result<T> {
if !self.limiter.check() {
tracing::info!("Rate limit exceeded. Skipping query");
return Err(anyhow::anyhow!("Rate limit exceeded"));
}
let e = match q.query(&mut self.client).await {
Ok(t) => return Ok(t),
Err(e) => e,

View File

@@ -141,19 +141,29 @@ where
struct MessageHandler<C: ProjectInfoCache + Send + Sync + 'static> {
cache: Arc<C>,
region_id: String,
}
impl<C: ProjectInfoCache + Send + Sync + 'static> Clone for MessageHandler<C> {
fn clone(&self) -> Self {
Self {
cache: self.cache.clone(),
region_id: self.region_id.clone(),
}
}
}
impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
pub(crate) fn new(cache: Arc<C>) -> Self {
Self { cache }
pub(crate) fn new(cache: Arc<C>, region_id: String) -> Self {
Self { cache, region_id }
}
pub(crate) async fn increment_active_listeners(&self) {
self.cache.increment_active_listeners().await;
}
pub(crate) async fn decrement_active_listeners(&self) {
self.cache.decrement_active_listeners().await;
}
#[tracing::instrument(skip(self, msg), fields(session_id = tracing::field::Empty))]
@@ -266,7 +276,7 @@ async fn handle_messages<C: ProjectInfoCache + Send + Sync + 'static>(
}
let mut conn = match try_connect(&redis).await {
Ok(conn) => {
handler.cache.increment_active_listeners().await;
handler.increment_active_listeners().await;
conn
}
Err(e) => {
@@ -287,11 +297,11 @@ async fn handle_messages<C: ProjectInfoCache + Send + Sync + 'static>(
}
}
if cancellation_token.is_cancelled() {
handler.cache.decrement_active_listeners().await;
handler.decrement_active_listeners().await;
return Ok(());
}
}
handler.cache.decrement_active_listeners().await;
handler.decrement_active_listeners().await;
}
}
@@ -300,11 +310,12 @@ async fn handle_messages<C: ProjectInfoCache + Send + Sync + 'static>(
pub async fn task_main<C>(
redis: ConnectionWithCredentialsProvider,
cache: Arc<C>,
region_id: String,
) -> anyhow::Result<Infallible>
where
C: ProjectInfoCache + Send + Sync + 'static,
{
let handler = MessageHandler::new(cache);
let handler = MessageHandler::new(cache, region_id);
// 6h - 1m.
// There will be 1 minute overlap between two tasks. But at least we can be sure that no message is lost.
let mut interval = tokio::time::interval(std::time::Duration::from_secs(6 * 60 * 60 - 60));

View File

@@ -417,7 +417,12 @@ async fn request_handler(
if config.http_config.accept_websockets
&& framed_websockets::upgrade::is_upgrade_request(&request)
{
let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Ws);
let ctx = RequestContext::new(
session_id,
conn_info,
crate::metrics::Protocol::Ws,
&config.region,
);
ctx.set_user_agent(
request
@@ -457,7 +462,12 @@ async fn request_handler(
// Return the response so the spawned future can continue.
Ok(response.map(|b| b.map_err(|x| match x {}).boxed()))
} else if request.uri().path() == "/sql" && *request.method() == Method::POST {
let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Http);
let ctx = RequestContext::new(
session_id,
conn_info,
crate::metrics::Protocol::Http,
&config.region,
);
let span = ctx.span();
let testodrome_id = request

View File

@@ -353,21 +353,6 @@ async fn main() -> anyhow::Result<()> {
}
};
if !args.dev {
let http_auth_enabled = args.http_auth_public_key_path.is_some();
let pg_auth_enabled = args.pg_auth_public_key_path.is_some();
let pg_tenant_only_auth_enabled = args.pg_tenant_only_auth_public_key_path.is_some();
if !http_auth_enabled || !pg_auth_enabled || !pg_tenant_only_auth_enabled {
bail!(
"Safekeeper refuses to start with HTTP, PostgreSQL, or tenant-only PostgreSQL API authentication disabled.\n\
Run with --dev to allow running without authentication.\n\
This is insecure and should only be used in development environments."
);
}
} else {
warn!("Starting in dev mode: this may be an insecure configuration.");
}
// Load JWT auth token to connect to other safekeepers for pull_timeline.
let sk_auth_token = if let Some(auth_token_path) = args.auth_token_path.as_ref() {
info!("loading JWT token for authentication with safekeepers from {auth_token_path}");

View File

@@ -9,7 +9,7 @@ use tokio::fs::OpenOptions;
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tracing::{info, warn};
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
use utils::lsn::{Lsn, WalSegmentSize};
use crate::GlobalTimelines;
use crate::control_file::FileStorage;
@@ -100,7 +100,7 @@ pub async fn handle_request(
}
}
let wal_seg_size = state.server.wal_seg_size as usize;
let wal_seg_size = state.server.wal_seg_size;
if wal_seg_size == 0 {
bail!("wal_seg_size is not set");
}
@@ -171,7 +171,7 @@ pub async fn handle_request(
async fn copy_disk_segments(
tli: &WalResidentTimeline,
wal_seg_size: usize,
wal_seg_size: WalSegmentSize,
start_lsn: Lsn,
end_lsn: Lsn,
tli_dir_path: &Utf8PathBuf,

View File

@@ -103,7 +103,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
let server_info = ServerInfo {
pg_version: request_data.pg_version,
system_id: request_data.system_id.unwrap_or(0),
wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE as u32),
wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE),
};
let global_timelines = get_global_timelines(&request);
global_timelines

View File

@@ -831,7 +831,7 @@ impl Collector for TimelineCollector {
if tli.last_removed_segno != 0 {
let segno_count = tli
.flush_lsn
.segment_number(tli.persisted_state.server.wal_seg_size as usize)
.segment_number(tli.persisted_state.server.wal_seg_size)
- tli.last_removed_segno;
let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
self.disk_usage

View File

@@ -27,7 +27,7 @@ use tracing::{error, info, instrument};
use utils::crashsafe::fsync_async_opt;
use utils::id::{NodeId, TenantTimelineId};
use utils::logging::SecretString;
use utils::lsn::Lsn;
use utils::lsn::{Lsn, WalSegmentSize};
use utils::pausable_failpoint;
use crate::control_file::CONTROL_FILE_NAME;
@@ -100,7 +100,7 @@ pub struct SnapshotContext {
pub term: Term,
pub last_log_term: Term,
pub flush_lsn: Lsn,
pub wal_seg_size: usize,
pub wal_seg_size: WalSegmentSize,
// used to remove WAL hold off in Drop.
pub tli: WalResidentTimeline,
}

View File

@@ -1439,7 +1439,7 @@ mod tests {
fn test_sk_state() -> TimelinePersistentState {
let mut state = TimelinePersistentState::empty();
state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
state.server.wal_seg_size = WAL_SEGMENT_SIZE;
state.tenant_id = TenantId::from([1u8; 16]);
state.timeline_id = TimelineId::from([1u8; 16]);
state

View File

@@ -152,7 +152,7 @@ impl TimelinePersistentState {
ServerInfo {
pg_version: PgVersionId::from(PgMajorVersion::PG17),
system_id: 0, /* Postgres system identifier */
wal_seg_size: WAL_SEGMENT_SIZE as u32,
wal_seg_size: WAL_SEGMENT_SIZE,
},
Lsn::INVALID,
Lsn::INVALID,

View File

@@ -23,7 +23,7 @@ use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::{NodeId, TenantId, TenantTimelineId};
use utils::lsn::Lsn;
use utils::lsn::{Lsn, WalSegmentSize};
use utils::sync::gate::Gate;
use crate::metrics::{FullTimelineInfo, MISC_OPERATION_SECONDS, WalStorageMetrics};
@@ -338,8 +338,8 @@ impl SharedState {
Ok(Self::new(sk))
}
pub(crate) fn get_wal_seg_size(&self) -> usize {
self.sk.state().server.wal_seg_size as usize
pub(crate) fn get_wal_seg_size(&self) -> WalSegmentSize {
self.sk.state().server.wal_seg_size
}
fn get_safekeeper_info(
@@ -747,7 +747,7 @@ impl Timeline {
}
/// Returns wal_seg_size.
pub async fn get_wal_seg_size(&self) -> usize {
pub async fn get_wal_seg_size(&self) -> WalSegmentSize {
self.read_shared_state().await.get_wal_seg_size()
}

View File

@@ -11,6 +11,7 @@ use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncWriteExt};
use tracing::{debug, info, instrument, warn};
use utils::crashsafe::durable_rename;
use utils::lsn::WalSegmentSize;
use crate::metrics::{
EVICTION_EVENTS_COMPLETED, EVICTION_EVENTS_STARTED, EvictionEvent, NUM_EVICTED_TIMELINES,
@@ -276,12 +277,12 @@ async fn compare_local_segment_with_remote(
async fn do_validation(
mgr: &Manager,
file: &mut File,
wal_seg_size: usize,
wal_seg_size: WalSegmentSize,
partial: &PartialRemoteSegment,
storage: &GenericRemoteStorage,
) -> anyhow::Result<()> {
let local_size = file.metadata().await?.len() as usize;
if local_size != wal_seg_size {
let local_size = file.metadata().await?.len();
if WalSegmentSize::try_from(local_size) != Ok(wal_seg_size) {
anyhow::bail!(
"local segment size is invalid: found {}, expected {}",
local_size,
@@ -296,12 +297,12 @@ async fn do_validation(
// remote segment should have bytes excatly up to `flush_lsn`
let expected_remote_size = partial.flush_lsn.segment_offset(mgr.wal_seg_size);
// let's compare the first `expected_remote_size` bytes
compare_n_bytes(&mut remote_reader, file, expected_remote_size).await?;
compare_n_bytes(&mut remote_reader, file, expected_remote_size as usize).await?;
// and check that the remote segment ends here
check_end(&mut remote_reader).await?;
// if local segment is longer, the rest should be zeroes
read_n_zeroes(file, mgr.wal_seg_size - expected_remote_size).await?;
read_n_zeroes(file, (mgr.wal_seg_size - expected_remote_size) as usize).await?;
// and check that the local segment ends here
check_end(file).await?;

View File

@@ -20,7 +20,7 @@ use tokio::task::{JoinError, JoinHandle};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, info, info_span, instrument, warn};
use utils::lsn::Lsn;
use utils::lsn::{Lsn, WalSegmentSize};
use crate::SafeKeeperConf;
use crate::control_file::{FileStorage, Storage};
@@ -198,7 +198,7 @@ pub(crate) struct Manager {
// configuration & dependencies
pub(crate) tli: ManagerTimeline,
pub(crate) conf: SafeKeeperConf,
pub(crate) wal_seg_size: usize,
pub(crate) wal_seg_size: WalSegmentSize,
pub(crate) walsenders: Arc<WalSenders>,
pub(crate) wal_backup: Arc<WalBackup>,

View File

@@ -23,7 +23,7 @@ use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
use utils::lsn::{Lsn, WalSegmentSize};
use utils::{backoff, pausable_failpoint};
use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS, WAL_BACKUP_TASKS};
@@ -52,7 +52,7 @@ impl WalBackupTaskHandle {
/// Do we have anything to upload to S3, i.e. should safekeepers run backup activity?
pub(crate) fn is_wal_backup_required(
wal_seg_size: usize,
wal_seg_size: WalSegmentSize,
num_computes: usize,
state: &StateSnapshot,
) -> bool {
@@ -210,7 +210,7 @@ impl WalBackup {
struct WalBackupTask {
timeline: WalResidentTimeline,
timeline_dir: Utf8PathBuf,
wal_seg_size: usize,
wal_seg_size: WalSegmentSize,
parallel_jobs: usize,
commit_lsn_watch_rx: watch::Receiver<Lsn>,
storage: Arc<GenericRemoteStorage>,
@@ -338,7 +338,7 @@ async fn backup_lsn_range(
storage: Arc<GenericRemoteStorage>,
backup_lsn: &mut Lsn,
end_lsn: Lsn,
wal_seg_size: usize,
wal_seg_size: WalSegmentSize,
timeline_dir: &Utf8Path,
parallel_jobs: usize,
) -> Result<()> {
@@ -461,12 +461,12 @@ impl Segment {
remote_timeline_path.join(self.object_name())
}
pub fn size(self) -> usize {
(u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize
pub fn size(self) -> WalSegmentSize {
(u64::from(self.end_lsn) - u64::from(self.start_lsn)) as WalSegmentSize
}
}
fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
fn get_segments(start: Lsn, end: Lsn, seg_size: WalSegmentSize) -> Vec<Segment> {
let first_seg = start.segment_number(seg_size);
let last_seg = end.segment_number(seg_size);
@@ -484,7 +484,7 @@ async fn backup_object(
storage: &GenericRemoteStorage,
source_file: &Utf8Path,
target_file: &RemotePath,
size: usize,
size: WalSegmentSize,
) -> Result<()> {
let file = File::open(&source_file)
.await
@@ -495,7 +495,7 @@ async fn backup_object(
let cancel = CancellationToken::new();
storage
.upload_storage_object(file, size, target_file, &cancel)
.upload_storage_object(file, size as usize, target_file, &cancel)
.await
}
@@ -503,7 +503,7 @@ pub(crate) async fn backup_partial_segment(
storage: &GenericRemoteStorage,
source_file: &Utf8Path,
target_file: &RemotePath,
size: usize,
size: WalSegmentSize,
) -> Result<()> {
let file = File::open(&source_file)
.await
@@ -519,7 +519,7 @@ pub(crate) async fn backup_partial_segment(
storage
.upload(
file,
size,
size as usize,
target_file,
Some(StorageMetadata::from([("sk_type", "partial_segment")])),
&cancel,
@@ -647,7 +647,7 @@ pub async fn delete_objects(storage: &GenericRemoteStorage, paths: &[RemotePath]
/// Copy segments from one timeline to another. Used in copy_timeline.
pub async fn copy_s3_segments(
storage: &GenericRemoteStorage,
wal_seg_size: usize,
wal_seg_size: WalSegmentSize,
src_ttid: &TenantTimelineId,
dst_ttid: &TenantTimelineId,
from_segment: XLogSegNo,

View File

@@ -29,7 +29,7 @@ use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn};
use utils::id::NodeId;
use utils::lsn::Lsn;
use utils::lsn::{Lsn, WalSegmentSize};
use crate::SafeKeeperConf;
use crate::metrics::{
@@ -151,7 +151,7 @@ impl State {
}
pub struct PartialBackup {
wal_seg_size: usize,
wal_seg_size: WalSegmentSize,
tli: WalResidentTimeline,
conf: SafeKeeperConf,
local_prefix: Utf8PathBuf,

View File

@@ -28,7 +28,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tracing::*;
use utils::crashsafe::durable_rename;
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
use utils::lsn::{Lsn, WalSegmentSize};
use crate::metrics::{
REMOVED_WAL_SEGMENTS, WAL_STORAGE_OPERATION_SECONDS, WalStorageMetrics, time_io_closure,
@@ -92,7 +92,7 @@ pub struct PhysicalStorage {
no_sync: bool,
/// Size of WAL segment in bytes.
wal_seg_size: usize,
wal_seg_size: WalSegmentSize,
pg_version: PgVersionId,
system_id: u64,
@@ -170,7 +170,7 @@ impl PhysicalStorage {
state: &TimelinePersistentState,
no_sync: bool,
) -> Result<PhysicalStorage> {
let wal_seg_size = state.server.wal_seg_size as usize;
let wal_seg_size = state.server.wal_seg_size;
// Find out where stored WAL ends, starting at commit_lsn which is a
// known recent record boundary (unless we don't have WAL at all).
@@ -315,7 +315,12 @@ impl PhysicalStorage {
/// Write WAL bytes, which are known to be located in a single WAL segment. Returns true if the
/// segment was completed, closed, and flushed to disk.
async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<bool> {
async fn write_in_segment(
&mut self,
segno: u64,
xlogoff: WalSegmentSize,
buf: &[u8],
) -> Result<bool> {
let mut file = if let Some(file) = self.file.take() {
file
} else {
@@ -331,7 +336,7 @@ impl PhysicalStorage {
// syscall, but needed in case of async). It does *not* fsyncs the file.
file.flush().await?;
if xlogoff + buf.len() == self.wal_seg_size {
if xlogoff as usize + buf.len() == self.wal_seg_size as usize {
// If we reached the end of a WAL segment, flush and close it.
self.fdatasync_file(&file).await?;
@@ -372,8 +377,8 @@ impl PhysicalStorage {
let segno = self.write_lsn.segment_number(self.wal_seg_size);
// If crossing a WAL boundary, only write up until we reach wal segment size.
let bytes_write = if xlogoff + buf.len() > self.wal_seg_size {
self.wal_seg_size - xlogoff
let bytes_write = if xlogoff as usize + buf.len() > self.wal_seg_size as usize {
(self.wal_seg_size - xlogoff) as usize
} else {
buf.len()
};
@@ -604,7 +609,7 @@ impl Storage for PhysicalStorage {
/// Remove all WAL segments in timeline_dir that match the given predicate.
async fn remove_segments_from_disk(
timeline_dir: &Utf8Path,
wal_seg_size: usize,
wal_seg_size: WalSegmentSize,
remove_predicate: impl Fn(XLogSegNo) -> bool,
) -> Result<()> {
let _timer = WAL_STORAGE_OPERATION_SECONDS
@@ -645,7 +650,7 @@ async fn remove_segments_from_disk(
pub struct WalReader {
remote_path: RemotePath,
timeline_dir: Utf8PathBuf,
wal_seg_size: usize,
wal_seg_size: WalSegmentSize,
pos: Lsn,
wal_segment: Option<Pin<Box<dyn AsyncRead + Send + Sync>>>,
@@ -683,7 +688,7 @@ impl WalReader {
if start_pos
< state
.timeline_start_lsn
.segment_lsn(state.server.wal_seg_size as usize)
.segment_lsn(state.server.wal_seg_size)
{
bail!(
"Requested streaming from {}, which is before the start of the timeline {}, and also doesn't start at the first segment of that timeline",
@@ -695,7 +700,7 @@ impl WalReader {
Ok(Self {
remote_path: remote_timeline_path(ttid)?,
timeline_dir,
wal_seg_size: state.server.wal_seg_size as usize,
wal_seg_size: state.server.wal_seg_size,
pos: start_pos,
wal_segment: None,
wal_backup,
@@ -743,12 +748,14 @@ impl WalReader {
// How many bytes may we consume in total?
let tl_start_seg_offset = self.timeline_start_lsn.segment_offset(self.wal_seg_size);
debug_assert!(seg_bytes.len() > pos_seg_offset);
debug_assert!(seg_bytes.len() > tl_start_seg_offset);
debug_assert!(seg_bytes.len() > pos_seg_offset as usize);
debug_assert!(seg_bytes.len() > tl_start_seg_offset as usize);
// Copy as many bytes as possible into the buffer
let len = (tl_start_seg_offset - pos_seg_offset).min(buf.len());
buf[0..len].copy_from_slice(&seg_bytes[pos_seg_offset..pos_seg_offset + len]);
let len = ((tl_start_seg_offset - pos_seg_offset) as usize).min(buf.len());
buf[0..len].copy_from_slice(
&seg_bytes[pos_seg_offset as usize..pos_seg_offset as usize + len],
);
self.pos += len as u64;
@@ -770,7 +777,7 @@ impl WalReader {
// How much to read and send in message? We cannot cross the WAL file
// boundary, and we don't want send more than provided buffer.
let xlogoff = self.pos.segment_offset(self.wal_seg_size);
let send_size = min(buf.len(), self.wal_seg_size - xlogoff);
let send_size = min(buf.len(), (self.wal_seg_size - xlogoff) as usize);
// Read some data from the file.
let buf = &mut buf[0..send_size];
@@ -831,7 +838,7 @@ impl WalReader {
pub(crate) async fn open_wal_file(
timeline_dir: &Utf8Path,
segno: XLogSegNo,
wal_seg_size: usize,
wal_seg_size: WalSegmentSize,
) -> Result<(tokio::fs::File, bool)> {
let (wal_file_path, wal_file_partial_path) = wal_file_paths(timeline_dir, segno, wal_seg_size);
@@ -858,7 +865,7 @@ pub(crate) async fn open_wal_file(
pub fn wal_file_paths(
timeline_dir: &Utf8Path,
segno: XLogSegNo,
wal_seg_size: usize,
wal_seg_size: WalSegmentSize,
) -> (Utf8PathBuf, Utf8PathBuf) {
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let wal_file_path = timeline_dir.join(wal_file_name.clone());

View File

@@ -27,7 +27,6 @@ governor.workspace = true
hex.workspace = true
hyper0.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
itertools.workspace = true
json-structural-diff.workspace = true
lasso.workspace = true
@@ -35,7 +34,6 @@ once_cell.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true
postgres_connection.workspace = true
posthog_client_lite.workspace = true
rand.workspace = true
reqwest = { workspace = true, features = ["stream"] }
routerify.workspace = true

View File

@@ -14,13 +14,11 @@ use http_utils::tls_certs::ReloadingCertificateResolver;
use hyper0::Uri;
use metrics::BuildInfo;
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::config::PostHogConfig;
use reqwest::Certificate;
use storage_controller::http::make_router;
use storage_controller::metrics::preinitialize_metrics;
use storage_controller::persistence::Persistence;
use storage_controller::service::chaos_injector::ChaosInjector;
use storage_controller::service::feature_flag::FeatureFlagService;
use storage_controller::service::{
Config, HEARTBEAT_INTERVAL_DEFAULT, LONG_RECONCILE_THRESHOLD_DEFAULT,
MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT,
@@ -254,8 +252,6 @@ struct Secrets {
peer_jwt_token: Option<String>,
}
const POSTHOG_CONFIG_ENV: &str = "POSTHOG_CONFIG";
impl Secrets {
const DATABASE_URL_ENV: &'static str = "DATABASE_URL";
const PAGESERVER_JWT_TOKEN_ENV: &'static str = "PAGESERVER_JWT_TOKEN";
@@ -413,18 +409,6 @@ async fn async_main() -> anyhow::Result<()> {
None => Vec::new(),
};
let posthog_config = if let Ok(json) = std::env::var(POSTHOG_CONFIG_ENV) {
let res: Result<PostHogConfig, _> = serde_json::from_str(&json);
if let Ok(config) = res {
Some(config)
} else {
tracing::warn!("Invalid posthog config: {json}");
None
}
} else {
None
};
let config = Config {
pageserver_jwt_token: secrets.pageserver_jwt_token,
safekeeper_jwt_token: secrets.safekeeper_jwt_token,
@@ -471,7 +455,6 @@ async fn async_main() -> anyhow::Result<()> {
timelines_onto_safekeepers: args.timelines_onto_safekeepers,
use_local_compute_notifications: args.use_local_compute_notifications,
timeline_safekeeper_count: args.timeline_safekeeper_count,
posthog_config: posthog_config.clone(),
#[cfg(feature = "testing")]
kick_secondary_downloads: args.kick_secondary_downloads,
};
@@ -554,23 +537,6 @@ async fn async_main() -> anyhow::Result<()> {
)
});
let feature_flag_task = if let Some(posthog_config) = posthog_config {
let service = service.clone();
let cancel = CancellationToken::new();
let cancel_bg = cancel.clone();
let task = tokio::task::spawn(
async move {
let feature_flag_service = FeatureFlagService::new(service, posthog_config);
let feature_flag_service = Arc::new(feature_flag_service);
feature_flag_service.run(cancel_bg).await
}
.instrument(tracing::info_span!("feature_flag_service")),
);
Some((task, cancel))
} else {
None
};
// Wait until we receive a signal
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit())?;
@@ -618,12 +584,6 @@ async fn async_main() -> anyhow::Result<()> {
chaos_jh.await.ok();
}
// If we were running the feature flag service, stop that so that we're not calling into Service while it shuts down
if let Some((feature_flag_task, feature_flag_cancel)) = feature_flag_task {
feature_flag_cancel.cancel();
feature_flag_task.await.ok();
}
service.shutdown().await;
tracing::info!("Service shutdown complete");

View File

@@ -376,13 +376,4 @@ impl PageserverClient {
.await
)
}
pub(crate) async fn update_feature_flag_spec(&self, spec: String) -> Result<()> {
measured_request!(
"update_feature_flag_spec",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.update_feature_flag_spec(spec).await
)
}
}

View File

@@ -1,6 +1,5 @@
pub mod chaos_injector;
mod context_iterator;
pub mod feature_flag;
pub(crate) mod safekeeper_reconciler;
mod safekeeper_service;
@@ -26,7 +25,6 @@ use futures::stream::FuturesUnordered;
use http_utils::error::ApiError;
use hyper::Uri;
use itertools::Itertools;
use pageserver_api::config::PostHogConfig;
use pageserver_api::controller_api::{
AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability,
NodeRegisterRequest, NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy,
@@ -473,9 +471,6 @@ pub struct Config {
/// Safekeepers will be choosen from different availability zones.
pub timeline_safekeeper_count: i64,
/// PostHog integration config
pub posthog_config: Option<PostHogConfig>,
#[cfg(feature = "testing")]
pub kick_secondary_downloads: bool,
}

View File

@@ -1,117 +0,0 @@
use std::{sync::Arc, time::Duration};
use futures::StreamExt;
use pageserver_api::config::PostHogConfig;
use pageserver_client::mgmt_api;
use posthog_client_lite::{PostHogClient, PostHogClientConfig};
use reqwest::StatusCode;
use tokio::time::MissedTickBehavior;
use tokio_util::sync::CancellationToken;
use crate::{pageserver_client::PageserverClient, service::Service};
pub struct FeatureFlagService {
service: Arc<Service>,
config: PostHogConfig,
client: PostHogClient,
http_client: reqwest::Client,
}
const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
impl FeatureFlagService {
pub fn new(service: Arc<Service>, config: PostHogConfig) -> Self {
let client = PostHogClient::new(PostHogClientConfig {
project_id: config.project_id.clone(),
server_api_key: config.server_api_key.clone(),
client_api_key: config.client_api_key.clone(),
private_api_url: config.private_api_url.clone(),
public_api_url: config.public_api_url.clone(),
});
Self {
service,
config,
client,
http_client: reqwest::Client::new(),
}
}
async fn refresh(self: Arc<Self>, cancel: CancellationToken) -> Result<(), anyhow::Error> {
let nodes = {
let inner = self.service.inner.read().unwrap();
inner.nodes.clone()
};
let feature_flag_spec = self.client.get_feature_flags_local_evaluation_raw().await?;
let stream = futures::stream::iter(nodes.values().cloned()).map(|node| {
let this = self.clone();
let feature_flag_spec = feature_flag_spec.clone();
async move {
let res = async {
let client = PageserverClient::new(
node.get_id(),
this.http_client.clone(),
node.base_url(),
// TODO: what if we rotate the token during storcon lifetime?
this.service.config.pageserver_jwt_token.as_deref(),
);
client.update_feature_flag_spec(feature_flag_spec).await?;
tracing::info!(
"Updated {}({}) with feature flag spec",
node.get_id(),
node.base_url()
);
Ok::<_, mgmt_api::Error>(())
};
if let Err(e) = res.await {
if let mgmt_api::Error::ApiError(status, _) = e {
if status == StatusCode::NOT_FOUND {
// This is expected during deployments where the API is not available, so we can ignore it
return;
}
}
tracing::warn!(
"Failed to update feature flag spec for {}: {e}",
node.get_id()
);
}
}
});
let mut stream = stream.buffer_unordered(8);
while stream.next().await.is_some() {
if cancel.is_cancelled() {
return Ok(());
}
}
Ok(())
}
pub async fn run(self: Arc<Self>, cancel: CancellationToken) {
let refresh_interval = self
.config
.refresh_interval
.unwrap_or(DEFAULT_POSTHOG_REFRESH_INTERVAL);
let mut interval = tokio::time::interval(refresh_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
tracing::info!(
"Starting feature flag service with refresh interval: {:?}",
refresh_interval
);
loop {
tokio::select! {
_ = interval.tick() => {}
_ = cancel.cancelled() => {
break;
}
}
let res = self.clone().refresh(cancel.clone()).await;
if let Err(e) = res {
tracing::error!("Failed to refresh feature flags: {e:#?}");
}
}
}
}

View File

@@ -13,7 +13,7 @@ use serde::Serialize;
use tokio_postgres::types::PgLsn;
use tracing::{debug, error, info};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::lsn::{Lsn, WalSegmentSize};
use crate::cloud_admin_api::CloudAdminApiClient;
use crate::metadata_stream::stream_listing;
@@ -22,7 +22,7 @@ use crate::{
};
/// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
const WAL_SEGSIZE: usize = 16 * 1024 * 1024;
const WAL_SEGSIZE: WalSegmentSize = 16 * 1024 * 1024;
#[derive(Serialize)]
pub struct MetadataSummary {

View File

@@ -118,8 +118,6 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
if sys.platform != "linux"
else []
),
# Tests run in dev mode
".*Starting in dev mode.*",
)

View File

@@ -671,6 +671,12 @@ def test_layer_download_cancelled_by_config_location(neon_env_builder: NeonEnvBu
"""
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
# On the new mode, the test runs into a cancellation issue, i.e. the walproposer can't shut down
# as it is hang-waiting on the timeline_checkpoint call in WalIngest::new.
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": False,
}
# turn off background tasks so that they don't interfere with the downloads
env = neon_env_builder.init_start(
initial_tenant_conf={

View File

@@ -1457,7 +1457,6 @@ class SafekeeperEnv:
str(i),
"--broker-endpoint",
self.fake_broker_endpoint,
"--dev",
]
log.info(f'Running command "{" ".join(cmd)}"')