Compare commits

..

7 Commits

Author SHA1 Message Date
Conrad Ludgate
4283e4c19e use smallvec and add comment about memory usage 2024-01-23 09:34:34 +00:00
Conrad Ludgate
565d2b2494 fix ip tests 2024-01-22 16:49:35 +00:00
Conrad Ludgate
d5db782f26 fix mock 2024-01-22 15:17:06 +00:00
Conrad Ludgate
2f0e3428bb no error string 2024-01-22 15:00:45 +00:00
Conrad Ludgate
3928d3bc8a eager parsing of ip addr 2024-01-22 14:53:50 +00:00
Conrad Ludgate
e7d36cc41f fix tests 2024-01-22 14:36:36 +00:00
Conrad Ludgate
5e150c9376 switch to lru 2024-01-22 14:28:50 +00:00
41 changed files with 648 additions and 1116 deletions

4
Cargo.lock generated
View File

@@ -3973,6 +3973,7 @@ dependencies = [
"serde",
"serde_json",
"sha2",
"smallvec",
"smol_str",
"socket2 0.5.5",
"sync_wrapper",
@@ -5107,6 +5108,9 @@ name = "smallvec"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9"
dependencies = [
"serde",
]
[[package]]
name = "smol_str"

View File

@@ -700,14 +700,13 @@ impl ComputeNode {
// In this case we need to connect with old `zenith_admin` name
// and create new user. We cannot simply rename connected user,
// but we can create a new one and grant it all privileges.
let connstr = self.connstr.clone();
let mut client = match Client::connect(connstr.as_str(), NoTls) {
let mut client = match Client::connect(self.connstr.as_str(), NoTls) {
Err(e) => {
info!(
"cannot connect to postgres: {}, retrying with `zenith_admin` username",
e
);
let mut zenith_admin_connstr = connstr.clone();
let mut zenith_admin_connstr = self.connstr.clone();
zenith_admin_connstr
.set_username("zenith_admin")
@@ -720,8 +719,8 @@ impl ComputeNode {
client.simple_query("GRANT zenith_admin TO cloud_admin")?;
drop(client);
// reconnect with connstring with expected name
Client::connect(connstr.as_str(), NoTls)?
// reconnect with connsting with expected name
Client::connect(self.connstr.as_str(), NoTls)?
}
Ok(client) => client,
};
@@ -735,8 +734,8 @@ impl ComputeNode {
cleanup_instance(&mut client)?;
handle_roles(spec, &mut client)?;
handle_databases(spec, &mut client)?;
handle_role_deletions(spec, connstr.as_str(), &mut client)?;
handle_grants(spec, &mut client, connstr.as_str())?;
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
handle_grants(spec, &mut client, self.connstr.as_str())?;
handle_extensions(spec, &mut client)?;
handle_extension_neon(&mut client)?;
create_availability_check_data(&mut client)?;
@@ -744,12 +743,6 @@ impl ComputeNode {
// 'Close' connection
drop(client);
if self.has_feature(ComputeFeature::Migrations) {
thread::spawn(move || {
let mut client = Client::connect(connstr.as_str(), NoTls)?;
handle_migrations(&mut client)
});
}
Ok(())
}
@@ -814,10 +807,6 @@ impl ComputeNode {
handle_grants(&spec, &mut client, self.connstr.as_str())?;
handle_extensions(&spec, &mut client)?;
handle_extension_neon(&mut client)?;
// We can skip handle_migrations here because a new migration can only appear
// if we have a new version of the compute_ctl binary, which can only happen
// if compute got restarted, in which case we'll end up inside of apply_config
// instead of reconfigure.
}
// 'Close' connection

View File

@@ -727,79 +727,3 @@ pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
Ok(())
}
#[instrument(skip_all)]
pub fn handle_migrations(client: &mut Client) -> Result<()> {
info!("handle migrations");
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
// !BE SURE TO ONLY ADD MIGRATIONS TO THE END OF THIS ARRAY. IF YOU DO NOT, VERY VERY BAD THINGS MAY HAPPEN!
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
let migrations = [
"ALTER ROLE neon_superuser BYPASSRLS",
r#"
DO $$
DECLARE
role_name text;
BEGIN
FOR role_name IN SELECT rolname FROM pg_roles WHERE pg_has_role(rolname, 'neon_superuser', 'member')
LOOP
RAISE NOTICE 'EXECUTING ALTER ROLE % INHERIT', quote_ident(role_name);
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' INHERIT';
END LOOP;
FOR role_name IN SELECT rolname FROM pg_roles
WHERE
NOT pg_has_role(rolname, 'neon_superuser', 'member') AND NOT starts_with(rolname, 'pg_')
LOOP
RAISE NOTICE 'EXECUTING ALTER ROLE % NOBYPASSRLS', quote_ident(role_name);
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' NOBYPASSRLS';
END LOOP;
END $$;
"#,
];
let mut query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
client.simple_query(query)?;
query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)";
client.simple_query(query)?;
query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING";
client.simple_query(query)?;
query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin";
client.simple_query(query)?;
query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC";
client.simple_query(query)?;
query = "SELECT id FROM neon_migration.migration_id";
let row = client.query_one(query, &[])?;
let mut current_migration: usize = row.get::<&str, i64>("id") as usize;
let starting_migration_id = current_migration;
query = "BEGIN";
client.simple_query(query)?;
while current_migration < migrations.len() {
info!("Running migration:\n{}\n", migrations[current_migration]);
client.simple_query(migrations[current_migration])?;
current_migration += 1;
}
let setval = format!(
"UPDATE neon_migration.migration_id SET id={}",
migrations.len()
);
client.simple_query(&setval)?;
query = "COMMIT";
client.simple_query(query)?;
info!(
"Ran {} migrations",
(migrations.len() - starting_migration_id)
);
Ok(())
}

View File

@@ -184,7 +184,7 @@ impl Persistence {
pub(crate) async fn increment_generation(
&self,
tenant_shard_id: TenantShardId,
node_id: NodeId,
node_id: Option<NodeId>,
) -> anyhow::Result<Generation> {
let (write, gen) = {
let mut locked = self.state.lock().unwrap();
@@ -192,9 +192,14 @@ impl Persistence {
anyhow::bail!("Tried to increment generation of unknown shard");
};
shard.generation += 1;
shard.generation_pageserver = Some(node_id);
// If we're called with a None pageserver, we need only update the generation
// record to disassociate it with this pageserver, not actually increment the number, as
// the increment is guaranteed to happen the next time this tenant is attached.
if node_id.is_some() {
shard.generation += 1;
}
shard.generation_pageserver = node_id;
let gen = Generation::new(shard.generation);
(locked.save(), gen)
};
@@ -203,19 +208,6 @@ impl Persistence {
Ok(gen)
}
pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
let write = {
let mut locked = self.state.lock().unwrap();
let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else {
anyhow::bail!("Tried to increment generation of unknown shard");
};
shard.generation_pageserver = None;
locked.save()
};
write.commit().await?;
Ok(())
}
pub(crate) async fn re_attach(
&self,
node_id: NodeId,

View File

@@ -296,7 +296,7 @@ impl Reconciler {
// Increment generation before attaching to new pageserver
self.generation = self
.persistence
.increment_generation(self.tenant_shard_id, dest_ps_id)
.increment_generation(self.tenant_shard_id, Some(dest_ps_id))
.await?;
let dest_conf = build_location_config(
@@ -395,7 +395,7 @@ impl Reconciler {
// as locations with unknown (None) observed state.
self.generation = self
.persistence
.increment_generation(self.tenant_shard_id, node_id)
.increment_generation(self.tenant_shard_id, Some(node_id))
.await?;
wanted_conf.generation = self.generation.into();
tracing::info!("Observed configuration requires update.");

View File

@@ -362,14 +362,13 @@ impl Service {
);
}
let new_generation = if let Some(req_node_id) = attach_req.node_id {
let new_generation = if attach_req.node_id.is_some() {
Some(
self.persistence
.increment_generation(attach_req.tenant_shard_id, req_node_id)
.increment_generation(attach_req.tenant_shard_id, attach_req.node_id)
.await?,
)
} else {
self.persistence.detach(attach_req.tenant_shard_id).await?;
None
};
@@ -408,7 +407,6 @@ impl Service {
"attach_hook: tenant {} set generation {:?}, pageserver {}",
attach_req.tenant_shard_id,
tenant_state.generation,
// TODO: this is an odd number of 0xf's
attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
);

View File

@@ -57,7 +57,7 @@ use crate::local_env::LocalEnv;
use crate::postgresql_conf::PostgresConf;
use compute_api::responses::{ComputeState, ComputeStatus};
use compute_api::spec::{Cluster, ComputeFeature, ComputeMode, ComputeSpec};
use compute_api::spec::{Cluster, ComputeMode, ComputeSpec};
// contents of a endpoint.json file
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
@@ -70,7 +70,6 @@ pub struct EndpointConf {
http_port: u16,
pg_version: u32,
skip_pg_catalog_updates: bool,
features: Vec<ComputeFeature>,
}
//
@@ -141,7 +140,6 @@ impl ComputeControlPlane {
// with this we basically test a case of waking up an idle compute, where
// we also skip catalog updates in the cloud.
skip_pg_catalog_updates: true,
features: vec![],
});
ep.create_endpoint_dir()?;
@@ -156,7 +154,6 @@ impl ComputeControlPlane {
pg_port,
pg_version,
skip_pg_catalog_updates: true,
features: vec![],
})?,
)?;
std::fs::write(
@@ -218,9 +215,6 @@ pub struct Endpoint {
// Optimizations
skip_pg_catalog_updates: bool,
// Feature flags
features: Vec<ComputeFeature>,
}
impl Endpoint {
@@ -250,7 +244,6 @@ impl Endpoint {
tenant_id: conf.tenant_id,
pg_version: conf.pg_version,
skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
features: conf.features,
})
}
@@ -526,7 +519,7 @@ impl Endpoint {
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
format_version: 1.0,
operation_uuid: None,
features: self.features.clone(),
features: vec![],
cluster: Cluster {
cluster_id: None, // project ID: not used
name: None, // project name: not used

View File

@@ -90,9 +90,6 @@ pub enum ComputeFeature {
/// track short-lived connections as user activity.
ActivityMonitorExperimental,
/// Enable running migrations
Migrations,
/// This is a special feature flag that is used to represent unknown feature flags.
/// Basically all unknown to enum flags are represented as this one. See unit test
/// `parse_unknown_features()` for more details.

View File

@@ -1,11 +1,9 @@
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::{Oid, TransactionId};
use serde::{Deserialize, Serialize};
use std::{fmt, ops::Range};
use std::fmt;
use crate::reltag::{BlockNumber, RelTag, SlruKind};
use crate::reltag::{BlockNumber, RelTag};
/// Key used in the Repository kv-store.
///
@@ -145,390 +143,12 @@ impl Key {
}
}
// Layout of the Key address space
//
// The Key struct, used to address the underlying key-value store, consists of
// 18 bytes, split into six fields. See 'Key' in repository.rs. We need to map
// all the data and metadata keys into those 18 bytes.
//
// Principles for the mapping:
//
// - Things that are often accessed or modified together, should be close to
// each other in the key space. For example, if a relation is extended by one
// block, we create a new key-value pair for the block data, and update the
// relation size entry. Because of that, the RelSize key comes after all the
// RelBlocks of a relation: the RelSize and the last RelBlock are always next
// to each other.
//
// The key space is divided into four major sections, identified by the first
// byte, and the form a hierarchy:
//
// 00 Relation data and metadata
//
// DbDir () -> (dbnode, spcnode)
// Filenodemap
// RelDir -> relnode forknum
// RelBlocks
// RelSize
//
// 01 SLRUs
//
// SlruDir kind
// SlruSegBlocks segno
// SlruSegSize
//
// 02 pg_twophase
//
// 03 misc
// Controlfile
// checkpoint
// pg_version
//
// 04 aux files
//
// Below is a full list of the keyspace allocation:
//
// DbDir:
// 00 00000000 00000000 00000000 00 00000000
//
// Filenodemap:
// 00 SPCNODE DBNODE 00000000 00 00000000
//
// RelDir:
// 00 SPCNODE DBNODE 00000000 00 00000001 (Postgres never uses relfilenode 0)
//
// RelBlock:
// 00 SPCNODE DBNODE RELNODE FORK BLKNUM
//
// RelSize:
// 00 SPCNODE DBNODE RELNODE FORK FFFFFFFF
//
// SlruDir:
// 01 kind 00000000 00000000 00 00000000
//
// SlruSegBlock:
// 01 kind 00000001 SEGNO 00 BLKNUM
//
// SlruSegSize:
// 01 kind 00000001 SEGNO 00 FFFFFFFF
//
// TwoPhaseDir:
// 02 00000000 00000000 00000000 00 00000000
//
// TwoPhaseFile:
// 02 00000000 00000000 00000000 00 XID
//
// ControlFile:
// 03 00000000 00000000 00000000 00 00000000
//
// Checkpoint:
// 03 00000000 00000000 00000000 00 00000001
//
// AuxFiles:
// 03 00000000 00000000 00000000 00 00000002
//
//-- Section 01: relation data and metadata
pub const DBDIR_KEY: Key = Key {
field1: 0x00,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
};
#[inline(always)]
pub fn dbdir_key_range(spcnode: Oid, dbnode: Oid) -> Range<Key> {
Key {
field1: 0x00,
field2: spcnode,
field3: dbnode,
field4: 0,
field5: 0,
field6: 0,
}..Key {
field1: 0x00,
field2: spcnode,
field3: dbnode,
field4: 0xffffffff,
field5: 0xff,
field6: 0xffffffff,
}
}
#[inline(always)]
pub fn relmap_file_key(spcnode: Oid, dbnode: Oid) -> Key {
Key {
field1: 0x00,
field2: spcnode,
field3: dbnode,
field4: 0,
field5: 0,
field6: 0,
}
}
#[inline(always)]
pub fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
Key {
field1: 0x00,
field2: spcnode,
field3: dbnode,
field4: 0,
field5: 0,
field6: 1,
}
}
#[inline(always)]
pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
Key {
field1: 0x00,
field2: rel.spcnode,
field3: rel.dbnode,
field4: rel.relnode,
field5: rel.forknum,
field6: blknum,
}
}
#[inline(always)]
pub fn rel_size_to_key(rel: RelTag) -> Key {
Key {
field1: 0x00,
field2: rel.spcnode,
field3: rel.dbnode,
field4: rel.relnode,
field5: rel.forknum,
field6: 0xffffffff,
}
}
#[inline(always)]
pub fn rel_key_range(rel: RelTag) -> Range<Key> {
Key {
field1: 0x00,
field2: rel.spcnode,
field3: rel.dbnode,
field4: rel.relnode,
field5: rel.forknum,
field6: 0,
}..Key {
field1: 0x00,
field2: rel.spcnode,
field3: rel.dbnode,
field4: rel.relnode,
field5: rel.forknum + 1,
field6: 0,
}
}
//-- Section 02: SLRUs
#[inline(always)]
pub fn slru_dir_to_key(kind: SlruKind) -> Key {
Key {
field1: 0x01,
field2: match kind {
SlruKind::Clog => 0x00,
SlruKind::MultiXactMembers => 0x01,
SlruKind::MultiXactOffsets => 0x02,
},
field3: 0,
field4: 0,
field5: 0,
field6: 0,
}
}
#[inline(always)]
pub fn slru_block_to_key(kind: SlruKind, segno: u32, blknum: BlockNumber) -> Key {
Key {
field1: 0x01,
field2: match kind {
SlruKind::Clog => 0x00,
SlruKind::MultiXactMembers => 0x01,
SlruKind::MultiXactOffsets => 0x02,
},
field3: 1,
field4: segno,
field5: 0,
field6: blknum,
}
}
#[inline(always)]
pub fn slru_segment_size_to_key(kind: SlruKind, segno: u32) -> Key {
Key {
field1: 0x01,
field2: match kind {
SlruKind::Clog => 0x00,
SlruKind::MultiXactMembers => 0x01,
SlruKind::MultiXactOffsets => 0x02,
},
field3: 1,
field4: segno,
field5: 0,
field6: 0xffffffff,
}
}
#[inline(always)]
pub fn slru_segment_key_range(kind: SlruKind, segno: u32) -> Range<Key> {
let field2 = match kind {
SlruKind::Clog => 0x00,
SlruKind::MultiXactMembers => 0x01,
SlruKind::MultiXactOffsets => 0x02,
};
Key {
field1: 0x01,
field2,
field3: 1,
field4: segno,
field5: 0,
field6: 0,
}..Key {
field1: 0x01,
field2,
field3: 1,
field4: segno,
field5: 1,
field6: 0,
}
}
//-- Section 03: pg_twophase
pub const TWOPHASEDIR_KEY: Key = Key {
field1: 0x02,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
};
#[inline(always)]
pub fn twophase_file_key(xid: TransactionId) -> Key {
Key {
field1: 0x02,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: xid,
}
}
#[inline(always)]
pub fn twophase_key_range(xid: TransactionId) -> Range<Key> {
let (next_xid, overflowed) = xid.overflowing_add(1);
Key {
field1: 0x02,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: xid,
}..Key {
field1: 0x02,
field2: 0,
field3: 0,
field4: 0,
field5: u8::from(overflowed),
field6: next_xid,
}
}
//-- Section 03: Control file
pub const CONTROLFILE_KEY: Key = Key {
field1: 0x03,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
};
pub const CHECKPOINT_KEY: Key = Key {
field1: 0x03,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 1,
};
pub const AUX_FILES_KEY: Key = Key {
field1: 0x03,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 2,
};
// Reverse mappings for a few Keys.
// These are needed by WAL redo manager.
// AUX_FILES currently stores only data for logical replication (slots etc), and
// we don't preserve these on a branch because safekeepers can't follow timeline
// switch (and generally it likely should be optional), so ignore these.
#[inline(always)]
pub fn is_inherited_key(key: Key) -> bool {
key != AUX_FILES_KEY
}
#[inline(always)]
pub fn is_rel_fsm_block_key(key: Key) -> bool {
key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff
}
#[inline(always)]
pub fn is_rel_vm_block_key(key: Key) -> bool {
key.field1 == 0x00
&& key.field4 != 0
&& key.field5 == VISIBILITYMAP_FORKNUM
&& key.field6 != 0xffffffff
}
#[inline(always)]
pub fn key_to_slru_block(key: Key) -> anyhow::Result<(SlruKind, u32, BlockNumber)> {
Ok(match key.field1 {
0x01 => {
let kind = match key.field2 {
0x00 => SlruKind::Clog,
0x01 => SlruKind::MultiXactMembers,
0x02 => SlruKind::MultiXactOffsets,
_ => anyhow::bail!("unrecognized slru kind 0x{:02x}", key.field2),
};
let segno = key.field4;
let blknum = key.field6;
(kind, segno, blknum)
}
_ => anyhow::bail!("unexpected value kind 0x{:02x}", key.field1),
})
}
#[inline(always)]
pub fn is_slru_block_key(key: Key) -> bool {
key.field1 == 0x01 // SLRU-related
&& key.field3 == 0x00000001 // but not SlruDir
&& key.field6 != 0xffffffff // and not SlruSegSize
}
#[inline(always)]
pub fn is_rel_block_key(key: &Key) -> bool {
key.field1 == 0x00 && key.field4 != 0 && key.field6 != 0xffffffff
}
/// Guaranteed to return `Ok()` if [[is_rel_block_key]] returns `true` for `key`.
#[inline(always)]
pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
Ok(match key.field1 {
0x00 => (

View File

@@ -61,7 +61,7 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::import_datadir::import_wal_from_tar;
use crate::metrics;
use crate::metrics::LIVE_CONNECTIONS_COUNT;
use crate::pgdatadir_mapping::Version;
use crate::pgdatadir_mapping::{rel_block_to_key, Version};
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
@@ -75,7 +75,6 @@ use crate::tenant::PageReconstructError;
use crate::tenant::Timeline;
use crate::trace::Tracer;
use pageserver_api::key::rel_block_to_key;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
@@ -387,18 +386,12 @@ impl PageServerHandler {
/// Future that completes when we need to shut down the connection.
///
/// We currently need to shut down when any of the following happens:
/// 1. any of the timelines we hold GateGuards for in `shard_timelines` is cancelled
/// 2. task_mgr requests shutdown of the connection
/// Reasons for need to shut down are:
/// - any of the timelines we hold GateGuards for in `shard_timelines` is cancelled
/// - task_mgr requests shutdown of the connection
///
/// NB on (1): the connection's lifecycle is not actually tied to any of the
/// `shard_timelines`s' lifecycles. But it's _necessary_ in the current
/// implementation to be responsive to timeline cancellation because
/// the connection holds their `GateGuards` open (sored in `shard_timelines`).
/// We currently do the easy thing and terminate the connection if any of the
/// shard_timelines gets cancelled. But really, we cuold spend more effort
/// and simply remove the cancelled timeline from the `shard_timelines`, thereby
/// dropping the guard.
/// The need to check for `task_mgr` cancellation arises mainly from `handle_pagerequests`
/// where, at first, `shard_timelines` is empty, see <https://github.com/neondatabase/neon/pull/6388>
///
/// NB: keep in sync with [`Self::is_connection_cancelled`]
async fn await_connection_cancelled(&self) {
@@ -411,17 +404,16 @@ impl PageServerHandler {
// immutable &self). So it's fine to evaluate shard_timelines after the sleep, we don't risk
// missing any inserts to the map.
let mut cancellation_sources = Vec::with_capacity(1 + self.shard_timelines.len());
use futures::future::Either;
cancellation_sources.push(Either::Left(task_mgr::shutdown_watcher()));
cancellation_sources.extend(
self.shard_timelines
.values()
.map(|ht| Either::Right(ht.timeline.cancel.cancelled())),
);
FuturesUnordered::from_iter(cancellation_sources)
.next()
.await;
let mut futs = self
.shard_timelines
.values()
.map(|ht| ht.timeline.cancel.cancelled())
.collect::<FuturesUnordered<_>>();
tokio::select! {
_ = task_mgr::shutdown_watcher() => { }
_ = futs.next() => {}
}
}
/// Checking variant of [`Self::await_connection_cancelled`].

View File

@@ -13,12 +13,7 @@ use crate::repository::*;
use crate::walrecord::NeonWalRecord;
use anyhow::{ensure, Context};
use bytes::{Buf, Bytes};
use pageserver_api::key::{
dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
};
use pageserver_api::key::is_rel_block_key;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
@@ -1540,6 +1535,366 @@ struct SlruSegmentDirectory {
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
// Layout of the Key address space
//
// The Key struct, used to address the underlying key-value store, consists of
// 18 bytes, split into six fields. See 'Key' in repository.rs. We need to map
// all the data and metadata keys into those 18 bytes.
//
// Principles for the mapping:
//
// - Things that are often accessed or modified together, should be close to
// each other in the key space. For example, if a relation is extended by one
// block, we create a new key-value pair for the block data, and update the
// relation size entry. Because of that, the RelSize key comes after all the
// RelBlocks of a relation: the RelSize and the last RelBlock are always next
// to each other.
//
// The key space is divided into four major sections, identified by the first
// byte, and the form a hierarchy:
//
// 00 Relation data and metadata
//
// DbDir () -> (dbnode, spcnode)
// Filenodemap
// RelDir -> relnode forknum
// RelBlocks
// RelSize
//
// 01 SLRUs
//
// SlruDir kind
// SlruSegBlocks segno
// SlruSegSize
//
// 02 pg_twophase
//
// 03 misc
// Controlfile
// checkpoint
// pg_version
//
// 04 aux files
//
// Below is a full list of the keyspace allocation:
//
// DbDir:
// 00 00000000 00000000 00000000 00 00000000
//
// Filenodemap:
// 00 SPCNODE DBNODE 00000000 00 00000000
//
// RelDir:
// 00 SPCNODE DBNODE 00000000 00 00000001 (Postgres never uses relfilenode 0)
//
// RelBlock:
// 00 SPCNODE DBNODE RELNODE FORK BLKNUM
//
// RelSize:
// 00 SPCNODE DBNODE RELNODE FORK FFFFFFFF
//
// SlruDir:
// 01 kind 00000000 00000000 00 00000000
//
// SlruSegBlock:
// 01 kind 00000001 SEGNO 00 BLKNUM
//
// SlruSegSize:
// 01 kind 00000001 SEGNO 00 FFFFFFFF
//
// TwoPhaseDir:
// 02 00000000 00000000 00000000 00 00000000
//
// TwoPhaseFile:
// 02 00000000 00000000 00000000 00 XID
//
// ControlFile:
// 03 00000000 00000000 00000000 00 00000000
//
// Checkpoint:
// 03 00000000 00000000 00000000 00 00000001
//
// AuxFiles:
// 03 00000000 00000000 00000000 00 00000002
//
//-- Section 01: relation data and metadata
const DBDIR_KEY: Key = Key {
field1: 0x00,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
};
fn dbdir_key_range(spcnode: Oid, dbnode: Oid) -> Range<Key> {
Key {
field1: 0x00,
field2: spcnode,
field3: dbnode,
field4: 0,
field5: 0,
field6: 0,
}..Key {
field1: 0x00,
field2: spcnode,
field3: dbnode,
field4: 0xffffffff,
field5: 0xff,
field6: 0xffffffff,
}
}
fn relmap_file_key(spcnode: Oid, dbnode: Oid) -> Key {
Key {
field1: 0x00,
field2: spcnode,
field3: dbnode,
field4: 0,
field5: 0,
field6: 0,
}
}
fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
Key {
field1: 0x00,
field2: spcnode,
field3: dbnode,
field4: 0,
field5: 0,
field6: 1,
}
}
pub(crate) fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
Key {
field1: 0x00,
field2: rel.spcnode,
field3: rel.dbnode,
field4: rel.relnode,
field5: rel.forknum,
field6: blknum,
}
}
fn rel_size_to_key(rel: RelTag) -> Key {
Key {
field1: 0x00,
field2: rel.spcnode,
field3: rel.dbnode,
field4: rel.relnode,
field5: rel.forknum,
field6: 0xffffffff,
}
}
fn rel_key_range(rel: RelTag) -> Range<Key> {
Key {
field1: 0x00,
field2: rel.spcnode,
field3: rel.dbnode,
field4: rel.relnode,
field5: rel.forknum,
field6: 0,
}..Key {
field1: 0x00,
field2: rel.spcnode,
field3: rel.dbnode,
field4: rel.relnode,
field5: rel.forknum + 1,
field6: 0,
}
}
//-- Section 02: SLRUs
fn slru_dir_to_key(kind: SlruKind) -> Key {
Key {
field1: 0x01,
field2: match kind {
SlruKind::Clog => 0x00,
SlruKind::MultiXactMembers => 0x01,
SlruKind::MultiXactOffsets => 0x02,
},
field3: 0,
field4: 0,
field5: 0,
field6: 0,
}
}
fn slru_block_to_key(kind: SlruKind, segno: u32, blknum: BlockNumber) -> Key {
Key {
field1: 0x01,
field2: match kind {
SlruKind::Clog => 0x00,
SlruKind::MultiXactMembers => 0x01,
SlruKind::MultiXactOffsets => 0x02,
},
field3: 1,
field4: segno,
field5: 0,
field6: blknum,
}
}
fn slru_segment_size_to_key(kind: SlruKind, segno: u32) -> Key {
Key {
field1: 0x01,
field2: match kind {
SlruKind::Clog => 0x00,
SlruKind::MultiXactMembers => 0x01,
SlruKind::MultiXactOffsets => 0x02,
},
field3: 1,
field4: segno,
field5: 0,
field6: 0xffffffff,
}
}
fn slru_segment_key_range(kind: SlruKind, segno: u32) -> Range<Key> {
let field2 = match kind {
SlruKind::Clog => 0x00,
SlruKind::MultiXactMembers => 0x01,
SlruKind::MultiXactOffsets => 0x02,
};
Key {
field1: 0x01,
field2,
field3: 1,
field4: segno,
field5: 0,
field6: 0,
}..Key {
field1: 0x01,
field2,
field3: 1,
field4: segno,
field5: 1,
field6: 0,
}
}
//-- Section 03: pg_twophase
const TWOPHASEDIR_KEY: Key = Key {
field1: 0x02,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
};
fn twophase_file_key(xid: TransactionId) -> Key {
Key {
field1: 0x02,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: xid,
}
}
fn twophase_key_range(xid: TransactionId) -> Range<Key> {
let (next_xid, overflowed) = xid.overflowing_add(1);
Key {
field1: 0x02,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: xid,
}..Key {
field1: 0x02,
field2: 0,
field3: 0,
field4: 0,
field5: u8::from(overflowed),
field6: next_xid,
}
}
//-- Section 03: Control file
const CONTROLFILE_KEY: Key = Key {
field1: 0x03,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
};
const CHECKPOINT_KEY: Key = Key {
field1: 0x03,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 1,
};
const AUX_FILES_KEY: Key = Key {
field1: 0x03,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 2,
};
// Reverse mappings for a few Keys.
// These are needed by WAL redo manager.
// AUX_FILES currently stores only data for logical replication (slots etc), and
// we don't preserve these on a branch because safekeepers can't follow timeline
// switch (and generally it likely should be optional), so ignore these.
pub fn is_inherited_key(key: Key) -> bool {
key != AUX_FILES_KEY
}
pub fn is_rel_fsm_block_key(key: Key) -> bool {
key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff
}
pub fn is_rel_vm_block_key(key: Key) -> bool {
key.field1 == 0x00
&& key.field4 != 0
&& key.field5 == VISIBILITYMAP_FORKNUM
&& key.field6 != 0xffffffff
}
pub fn key_to_slru_block(key: Key) -> anyhow::Result<(SlruKind, u32, BlockNumber)> {
Ok(match key.field1 {
0x01 => {
let kind = match key.field2 {
0x00 => SlruKind::Clog,
0x01 => SlruKind::MultiXactMembers,
0x02 => SlruKind::MultiXactOffsets,
_ => anyhow::bail!("unrecognized slru kind 0x{:02x}", key.field2),
};
let segno = key.field4;
let blknum = key.field6;
(kind, segno, blknum)
}
_ => anyhow::bail!("unexpected value kind 0x{:02x}", key.field1),
})
}
fn is_slru_block_key(key: Key) -> bool {
key.field1 == 0x01 // SLRU-related
&& key.field3 == 0x00000001 // but not SlruDir
&& key.field6 != 0xffffffff // and not SlruSegSize
}
#[allow(clippy::bool_assert_comparison)]
#[cfg(test)]
mod tests {

View File

@@ -716,10 +716,6 @@ impl Tenant {
// stayed in Activating for such a long time that shutdown found it in
// that state.
tracing::info!(state=%tenant_clone.current_state(), "Tenant shut down before activation");
// Make the tenant broken so that set_stopping will not hang waiting for it to leave
// the Attaching state. This is an over-reaction (nothing really broke, the tenant is
// just shutting down), but ensures progress.
make_broken(&tenant_clone, anyhow::anyhow!("Shut down while Attaching"));
return Ok(());
},
)
@@ -1881,7 +1877,7 @@ impl Tenant {
&self,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<(), timeline::CompactionError> {
) -> anyhow::Result<(), timeline::CompactionError> {
// Don't start doing work during shutdown, or when broken, we do not need those in the logs
if !self.is_active() {
return Ok(());

View File

@@ -619,8 +619,8 @@ impl LayerMap {
}
/// Return all L0 delta layers
pub fn get_level0_deltas(&self) -> Vec<Arc<PersistentLayerDesc>> {
self.l0_delta_layers.to_vec()
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
Ok(self.l0_delta_layers.to_vec())
}
/// debugging function to print out the contents of the layer map

View File

@@ -237,7 +237,7 @@ use utils::id::{TenantId, TimelineId};
use self::index::IndexPart;
use super::storage_layer::{Layer, LayerFileName, ResidentLayer};
use super::upload_queue::{self, SetDeletedFlagProgress};
use super::upload_queue::SetDeletedFlagProgress;
use super::Generation;
pub(crate) use download::{is_temp_download_file, list_remote_timelines};
@@ -621,9 +621,7 @@ impl RemoteTimelineClient {
///
/// Like schedule_index_upload_for_metadata_update(), this merely adds
/// the upload to the upload queue and returns quickly.
pub(crate) fn schedule_index_upload_for_file_changes(
self: &Arc<Self>,
) -> Result<(), upload_queue::NotInitialized> {
pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
@@ -668,7 +666,7 @@ impl RemoteTimelineClient {
pub(crate) fn schedule_layer_file_upload(
self: &Arc<Self>,
layer: ResidentLayer,
) -> Result<(), upload_queue::NotInitialized> {
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
@@ -877,7 +875,7 @@ impl RemoteTimelineClient {
self: &Arc<Self>,
compacted_from: &[Layer],
compacted_to: &[ResidentLayer],
) -> Result<(), upload_queue::NotInitialized> {
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;

View File

@@ -290,7 +290,7 @@ impl Layer {
}
/// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
pub(crate) async fn download_and_keep_resident(&self) -> Result<ResidentLayer, DownloadError> {
pub(crate) async fn download_and_keep_resident(&self) -> anyhow::Result<ResidentLayer> {
let downloaded = self.0.get_or_maybe_download(true, None).await?;
Ok(ResidentLayer {
@@ -1174,7 +1174,7 @@ pub(crate) enum EvictionError {
/// Error internal to the [`LayerInner::get_or_maybe_download`]
#[derive(Debug, thiserror::Error)]
pub(crate) enum DownloadError {
enum DownloadError {
#[error("timeline has already shutdown")]
TimelineShutdown,
#[error("no remote storage configured")]
@@ -1197,15 +1197,6 @@ pub(crate) enum DownloadError {
PostStatFailed(#[source] std::io::Error),
}
impl DownloadError {
pub fn is_cancelled(&self) -> bool {
match self {
Self::TimelineShutdown | Self::DownloadCancelled => true,
_ => false,
}
}
}
#[derive(Debug, PartialEq)]
pub(crate) enum NeedsDownload {
NotFound,

View File

@@ -9,7 +9,6 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::TENANT_TASK_EVENTS;
use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::timeline::CompactionError;
use crate::tenant::{Tenant, TenantState};
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -182,11 +181,8 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
);
error_run_count += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
log_compaction_error(
&e,
error_run_count,
&wait_duration,
cancel.is_cancelled(),
error!(
"Compaction failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
);
wait_duration
} else {
@@ -214,38 +210,6 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
}
fn log_compaction_error(
e: &CompactionError,
error_run_count: u32,
sleep_duration: &std::time::Duration,
task_cancelled: bool,
) {
use crate::tenant::upload_queue::NotInitialized;
use crate::tenant::PageReconstructError;
use CompactionError::*;
enum LooksLike {
Info,
Error,
}
let decision = match e {
ShuttingDown => None,
_ if task_cancelled => Some(LooksLike::Info),
Other(e) => Some(LooksLike::Error),
};
match decision {
Some(LooksLike::Info) => info!(
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:#}",
),
Some(LooksLike::Error) => error!(
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:?}",
),
None => {}
}
}
///
/// GC task's main loop
///

View File

@@ -73,8 +73,8 @@ use crate::metrics::{
TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
};
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use crate::pgdatadir_mapping::{is_inherited_key, is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::tenant::config::TenantConfOpt;
use pageserver_api::key::{is_inherited_key, is_rel_fsm_block_key, is_rel_vm_block_key};
use pageserver_api::reltag::RelTag;
use pageserver_api::shard::ShardIndex;
@@ -103,14 +103,11 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::remote_timeline_client::index::{IndexLayerMetadata, IndexPart};
use super::remote_timeline_client::RemoteTimelineClient;
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
use super::{config::TenantConf, upload_queue::NotInitialized};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{
remote_timeline_client::index::{IndexLayerMetadata, IndexPart},
storage_layer::layer,
};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) enum FlushLoopState {
@@ -394,7 +391,8 @@ pub(crate) enum PageReconstructError {
#[error("Ancestor LSN wait error: {0}")]
AncestorLsnTimeout(#[from] WaitLsnError),
#[error("timeline shutting down")]
/// The operation was cancelled
#[error("Cancelled")]
Cancelled,
/// The ancestor of this is being stopped
@@ -406,19 +404,6 @@ pub(crate) enum PageReconstructError {
WalRedo(anyhow::Error),
}
impl PageReconstructError {
/// Returns true if this error indicates a tenant/timeline shutdown alike situation
pub(crate) fn is_stopping(&self) -> bool {
use PageReconstructError::*;
match self {
Other(_) => false,
AncestorLsnTimeout(_) => false,
Cancelled | AncestorStopping(_) => true,
WalRedo(_) => false,
}
}
}
#[derive(thiserror::Error, Debug)]
enum FlushLayerError {
/// Timeline cancellation token was cancelled
@@ -847,7 +832,8 @@ impl Timeline {
// "enough".
let layers = self
.create_image_layers(&partitioning, lsn, false, &image_ctx)
.await?;
.await
.map_err(anyhow::Error::from)?;
if let Some(remote_client) = &self.remote_client {
for layer in layers {
remote_client.schedule_layer_file_upload(layer)?;
@@ -3214,46 +3200,7 @@ pub(crate) enum CompactionError {
ShuttingDown,
/// Compaction cannot be done right now; page reconstruction and so on.
#[error(transparent)]
Other(anyhow::Error),
}
impl CompactionError {
fn other<E>(err: E) -> Self
where
E: std::error::Error + Send + Sync + 'static,
{
CompactionError::Other(anyhow::Error::new(err))
}
}
impl From<PageReconstructError> for CompactionError {
fn from(value: PageReconstructError) -> Self {
if value.is_stopping() {
CompactionError::ShuttingDown
} else {
CompactionError::other(value)
}
}
}
impl From<NotInitialized> for CompactionError {
fn from(value: NotInitialized) -> Self {
if value.is_stopping() {
CompactionError::ShuttingDown
} else {
CompactionError::other(value)
}
}
}
impl From<layer::DownloadError> for CompactionError {
fn from(value: layer::DownloadError) -> Self {
if value.is_cancelled() {
CompactionError::ShuttingDown
} else {
CompactionError::other(value)
}
}
Other(#[from] anyhow::Error),
}
#[serde_as]
@@ -3386,7 +3333,7 @@ impl Timeline {
stats.read_lock_held_spawn_blocking_startup_micros =
stats.read_lock_acquisition_micros.till_now(); // set by caller
let layers = guard.layer_map();
let level0_deltas = layers.get_level0_deltas();
let level0_deltas = layers.get_level0_deltas()?;
let mut level0_deltas = level0_deltas
.into_iter()
.map(|x| guard.get_from_desc(&x))
@@ -3433,8 +3380,7 @@ impl Timeline {
delta
.download_and_keep_resident()
.await
.context("download layer for failpoint")
.map_err(CompactionError::Other)?,
.context("download layer for failpoint")?,
);
}
tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
@@ -3518,7 +3464,7 @@ impl Timeline {
let mut all_keys = Vec::new();
for l in deltas_to_compact.iter() {
all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
all_keys.extend(l.load_keys(ctx).await?);
}
// FIXME: should spawn_blocking the rest of this function
@@ -3538,10 +3484,7 @@ impl Timeline {
// has not so much sense, because largest holes will corresponds field1/field2 changes.
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
// That is why it is better to measure size of hole as number of covering image layers.
let coverage_size = layers
.image_coverage(&key_range, last_record_lsn)
.map_err(CompactionError::Other)?
.len();
let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
if coverage_size >= min_hole_coverage_size {
heap.push(Hole {
key_range,
@@ -3640,7 +3583,7 @@ impl Timeline {
key, lsn, ref val, ..
} in all_values_iter
{
let value = val.load(ctx).await.map_err(CompactionError::Other)?;
let value = val.load(ctx).await?;
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key || lsn == dup_end_lsn {
@@ -3697,8 +3640,7 @@ impl Timeline {
.take()
.unwrap()
.finish(prev_key.unwrap().next(), self)
.await
.map_err(CompactionError::Other)?,
.await?,
);
writer = None;
@@ -3728,8 +3670,7 @@ impl Timeline {
lsn_range.clone()
},
)
.await
.map_err(CompactionError::Other)?,
.await?,
);
}
@@ -3740,12 +3681,7 @@ impl Timeline {
});
if !self.shard_identity.is_key_disposable(&key) {
writer
.as_mut()
.unwrap()
.put_value(key, lsn, value)
.await
.map_err(CompactionError::Other)?;
writer.as_mut().unwrap().put_value(key, lsn, value).await?;
} else {
debug!(
"Dropping key {} during compaction (it belongs on shard {:?})",
@@ -3761,12 +3697,7 @@ impl Timeline {
prev_key = Some(key);
}
if let Some(writer) = writer {
new_layers.push(
writer
.finish(prev_key.unwrap().next(), self)
.await
.map_err(CompactionError::Other)?,
);
new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
}
// Sync layers
@@ -3795,8 +3726,7 @@ impl Timeline {
// minimize latency.
par_fsync::par_fsync_async(&layer_paths)
.await
.context("fsync all new layers")
.map_err(CompactionError::Other)?;
.context("fsync all new layers")?;
let timeline_dir = self
.conf
@@ -3804,8 +3734,7 @@ impl Timeline {
par_fsync::par_fsync_async(&[timeline_dir])
.await
.context("fsync of timeline dir")
.map_err(CompactionError::Other)?;
.context("fsync of timeline dir")?;
}
stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();

View File

@@ -126,27 +126,6 @@ pub(super) struct UploadQueueStopped {
pub(super) deleted_at: SetDeletedFlagProgress,
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum NotInitialized {
#[error("queue is in state Uninitialized")]
Uninitialized,
#[error("queue is in state Stopping")]
Stopped,
#[error("queue is shutting down")]
ShuttingDown,
}
impl NotInitialized {
pub(crate) fn is_stopping(&self) -> bool {
use NotInitialized::*;
match self {
Uninitialized => false,
Stopped => true,
ShuttingDown => true,
}
}
}
impl UploadQueue {
pub(crate) fn initialize_empty_remote(
&mut self,
@@ -234,20 +213,18 @@ impl UploadQueue {
Ok(self.initialized_mut().expect("we just set it"))
}
pub(crate) fn initialized_mut(
&mut self,
) -> Result<&mut UploadQueueInitialized, NotInitialized> {
use UploadQueue::*;
pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
match self {
Uninitialized => Err(NotInitialized::Uninitialized.into()),
Initialized(x) => {
if x.shutting_down {
Err(NotInitialized::ShuttingDown.into())
} else {
UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
anyhow::bail!("queue is in state {}", self.as_str())
}
UploadQueue::Initialized(x) => {
if !x.shutting_down {
Ok(x)
} else {
anyhow::bail!("queue is shutting down")
}
}
Stopped(_) => Err(NotInitialized::Stopped.into()),
}
}

View File

@@ -33,12 +33,11 @@ use utils::failpoint_support;
use crate::context::RequestContext;
use crate::metrics::WAL_INGEST;
use crate::pgdatadir_mapping::{DatadirModification, Version};
use crate::pgdatadir_mapping::*;
use crate::tenant::PageReconstructError;
use crate::tenant::Timeline;
use crate::walrecord::*;
use crate::ZERO_PAGE;
use pageserver_api::key::rel_block_to_key;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};

View File

@@ -47,10 +47,11 @@ use crate::metrics::{
WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM,
WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
};
use crate::pgdatadir_mapping::key_to_slru_block;
use crate::repository::Key;
use crate::walrecord::NeonWalRecord;
use pageserver_api::key::{key_to_rel_block, key_to_slru_block};
use pageserver_api::key::key_to_rel_block;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;

View File

@@ -637,7 +637,7 @@ HandleAlterRole(AlterRoleStmt *stmt)
ListCell *option;
const char *role_name = stmt->role->rolename;
if (RoleIsNeonSuperuser(role_name) && !superuser())
if (RoleIsNeonSuperuser(role_name))
elog(ERROR, "can't ALTER neon_superuser");
foreach(option, stmt->options)

View File

@@ -64,26 +64,10 @@ static int max_reconnect_attempts = 60;
#define MAX_PAGESERVER_CONNSTRING_SIZE 256
/*
* The "neon.pageserver_connstring" GUC is marked with the PGC_SIGHUP option,
* allowing it to be changed using pg_reload_conf(). The control plane can
* update the connection string if the pageserver crashes, is relocated, or
* new shards are added. A copy of the current value of the GUC is kept in
* shared memory, updated by the postmaster, because regular backends don't
* reload the config during query execution, but we might need to re-establish
* the pageserver connection with the new connection string even in the middle
* of a query.
*
* The shared memory copy is protected by a lockless algorithm using two
* atomic counters. The counters allow a backend to quickly check if the value
* has changed since last access, and to detect and retry copying the value if
* the postmaster changes the value concurrently. (Postmaster doesn't have a
* PGPROC entry and therefore cannot use LWLocks.)
*/
typedef struct
{
pg_atomic_uint64 begin_update_counter;
pg_atomic_uint64 end_update_counter;
LWLockId lock;
pg_atomic_uint64 update_counter;
char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
} PagestoreShmemState;
@@ -100,7 +84,7 @@ static bool pageserver_flush(void);
static void pageserver_disconnect(void);
static bool
PagestoreShmemIsValid(void)
PagestoreShmemIsValid()
{
return pagestore_shared && UsedShmemSegAddr;
}
@@ -114,58 +98,31 @@ CheckPageserverConnstring(char **newval, void **extra, GucSource source)
static void
AssignPageserverConnstring(const char *newval, void *extra)
{
/*
* Only postmaster updates the copy in shared memory.
*/
if (!PagestoreShmemIsValid() || IsUnderPostmaster)
if (!PagestoreShmemIsValid())
return;
pg_atomic_add_fetch_u64(&pagestore_shared->begin_update_counter, 1);
pg_write_barrier();
LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE);
strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE);
pg_write_barrier();
pg_atomic_add_fetch_u64(&pagestore_shared->end_update_counter, 1);
pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1);
LWLockRelease(pagestore_shared->lock);
}
static bool
CheckConnstringUpdated(void)
CheckConnstringUpdated()
{
if (!PagestoreShmemIsValid())
return false;
return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->begin_update_counter);
return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter);
}
static void
ReloadConnstring(void)
ReloadConnstring()
{
uint64 begin_update_counter;
uint64 end_update_counter;
if (!PagestoreShmemIsValid())
return;
/*
* Copy the current settnig from shared to local memory. Postmaster can
* update the value concurrently, in which case we would copy a garbled
* mix of the old and new values. We will detect it because the counter's
* won't match, and retry. But it's important that we don't do anything
* within the retry-loop that would depend on the string having valid
* contents.
*/
do
{
begin_update_counter = pg_atomic_read_u64(&pagestore_shared->begin_update_counter);
end_update_counter = pg_atomic_read_u64(&pagestore_shared->end_update_counter);
pg_read_barrier();
strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring));
pg_read_barrier();
}
while (begin_update_counter != end_update_counter
|| begin_update_counter != pg_atomic_read_u64(&pagestore_shared->begin_update_counter)
|| end_update_counter != pg_atomic_read_u64(&pagestore_shared->end_update_counter));
pagestore_local_counter = end_update_counter;
LWLockAcquire(pagestore_shared->lock, LW_SHARED);
strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring));
pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter);
LWLockRelease(pagestore_shared->lock);
}
static bool
@@ -180,7 +137,7 @@ pageserver_connect(int elevel)
static TimestampTz last_connect_time = 0;
static uint64_t delay_us = MIN_RECONNECT_INTERVAL_USEC;
TimestampTz now;
uint64_t us_since_last_connect;
uint64_t us_since_last_connect;
Assert(!connected);
@@ -190,7 +147,7 @@ pageserver_connect(int elevel)
}
now = GetCurrentTimestamp();
us_since_last_connect = now - last_connect_time;
us_since_last_connect = now - last_connect_time;
if (us_since_last_connect < delay_us)
{
pg_usleep(delay_us - us_since_last_connect);
@@ -548,8 +505,8 @@ PagestoreShmemInit(void)
&found);
if (!found)
{
pg_atomic_init_u64(&pagestore_shared->begin_update_counter, 0);
pg_atomic_init_u64(&pagestore_shared->end_update_counter, 0);
pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock);
pg_atomic_init_u64(&pagestore_shared->update_counter, 0);
AssignPageserverConnstring(page_server_connstring, NULL);
}
LWLockRelease(AddinShmemInitLock);
@@ -574,6 +531,7 @@ pagestore_shmem_request(void)
#endif
RequestAddinShmemSpace(PagestoreShmemSize());
RequestNamedLWLockTranche("neon_libpagestore", 1);
}
static void

View File

@@ -81,6 +81,7 @@ postgres-native-tls.workspace = true
postgres-protocol.workspace = true
redis.workspace = true
smol_str.workspace = true
smallvec = { workspace = true, features = ["serde"] }
workspace_hack.workspace = true

View File

@@ -4,7 +4,9 @@ pub mod backend;
pub use backend::BackendType;
mod credentials;
pub use credentials::{check_peer_addr_is_in_list, endpoint_sni, ComputeUserInfoMaybeEndpoint};
pub use credentials::{
check_peer_addr_is_in_list, endpoint_sni, ComputeUserInfoMaybeEndpoint, IpPattern,
};
mod password_hack;
pub use password_hack::parse_endpoint_param;

View File

@@ -35,6 +35,8 @@ use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{error, info, warn};
use super::IpPattern;
/// This type serves two purposes:
///
/// * When `T` is `()`, it's just a regular auth backend selector
@@ -55,7 +57,7 @@ pub enum BackendType<'a, T> {
pub trait TestBackend: Send + Sync + 'static {
fn wake_compute(&self) -> Result<CachedNodeInfo, console::errors::WakeComputeError>;
fn get_allowed_ips(&self) -> Result<Vec<SmolStr>, console::errors::GetAuthInfoError>;
fn get_allowed_ips(&self) -> Result<Vec<IpPattern>, console::errors::GetAuthInfoError>;
}
impl std::fmt::Display for BackendType<'_, ()> {

View File

@@ -7,7 +7,7 @@ use crate::{
use itertools::Itertools;
use pq_proto::StartupMessageParams;
use smol_str::SmolStr;
use std::{collections::HashSet, net::IpAddr};
use std::{collections::HashSet, net::IpAddr, str::FromStr};
use thiserror::Error;
use tracing::{info, warn};
@@ -151,30 +151,51 @@ impl ComputeUserInfoMaybeEndpoint {
}
}
pub fn check_peer_addr_is_in_list(peer_addr: &IpAddr, ip_list: &Vec<SmolStr>) -> bool {
if ip_list.is_empty() {
return true;
}
for ip in ip_list {
// We expect that all ip addresses from control plane are correct.
// However, if some of them are broken, we still can check the others.
match parse_ip_pattern(ip) {
Ok(pattern) => {
if check_ip(peer_addr, &pattern) {
return true;
}
}
Err(err) => warn!("Cannot parse ip: {}; err: {}", ip, err),
}
}
false
pub fn check_peer_addr_is_in_list(peer_addr: &IpAddr, ip_list: &[IpPattern]) -> bool {
ip_list.is_empty() || ip_list.iter().any(|pattern| check_ip(peer_addr, pattern))
}
#[derive(Debug, Clone, Eq, PartialEq)]
enum IpPattern {
pub enum IpPattern {
Subnet(ipnet::IpNet),
Range(IpAddr, IpAddr),
Single(IpAddr),
None,
}
impl<'de> serde::de::Deserialize<'de> for IpPattern {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct StrVisitor;
impl<'de> serde::de::Visitor<'de> for StrVisitor {
type Value = IpPattern;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "comma separated list with ip address, ip address range, or ip address subnet mask")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(parse_ip_pattern(v).unwrap_or_else(|e| {
warn!("Cannot parse ip pattern {v}: {e}");
IpPattern::None
}))
}
}
deserializer.deserialize_str(StrVisitor)
}
}
impl FromStr for IpPattern {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
parse_ip_pattern(s)
}
}
fn parse_ip_pattern(pattern: &str) -> anyhow::Result<IpPattern> {
@@ -196,6 +217,7 @@ fn check_ip(ip: &IpAddr, pattern: &IpPattern) -> bool {
IpPattern::Subnet(subnet) => subnet.contains(ip),
IpPattern::Range(start, end) => start <= ip && ip <= end,
IpPattern::Single(addr) => addr == ip,
IpPattern::None => false,
}
}
@@ -206,6 +228,7 @@ fn project_name_valid(name: &str) -> bool {
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use ComputeUserInfoParseError::*;
#[test]
@@ -415,21 +438,17 @@ mod tests {
#[test]
fn test_check_peer_addr_is_in_list() {
let peer_addr = IpAddr::from([127, 0, 0, 1]);
assert!(check_peer_addr_is_in_list(&peer_addr, &vec![]));
assert!(check_peer_addr_is_in_list(
&peer_addr,
&vec!["127.0.0.1".into()]
));
assert!(!check_peer_addr_is_in_list(
&peer_addr,
&vec!["8.8.8.8".into()]
));
fn check(v: serde_json::Value) -> bool {
let peer_addr = IpAddr::from([127, 0, 0, 1]);
let ip_list: Vec<IpPattern> = serde_json::from_value(v).unwrap();
check_peer_addr_is_in_list(&peer_addr, &ip_list)
}
assert!(check(json!([])));
assert!(check(json!(["127.0.0.1"])));
assert!(!check(json!(["8.8.8.8"])));
// If there is an incorrect address, it will be skipped.
assert!(check_peer_addr_is_in_list(
&peer_addr,
&vec!["88.8.8".into(), "127.0.0.1".into()]
));
assert!(check(json!(["88.8.8", "127.0.0.1"])));
}
#[test]
fn test_parse_ip_v4() -> anyhow::Result<()> {

View File

@@ -257,7 +257,6 @@ async fn main() -> anyhow::Result<()> {
maintenance_tasks
.spawn(notifications::task_main(url.to_owned(), cache.clone()));
}
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
}
#[cfg(feature = "testing")]
proxy::console::provider::ConsoleBackend::Postgres(_) => {}

View File

@@ -1,17 +1,17 @@
use std::{
collections::HashSet,
convert::Infallible,
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use dashmap::DashMap;
use rand::{thread_rng, Rng};
use hashlink::LruCache;
use parking_lot::Mutex;
use smallvec::SmallVec;
use smol_str::SmolStr;
use tokio::time::Instant;
use tracing::{debug, info};
use tracing::info;
use crate::{config::ProjectInfoCacheOptions, console::AuthSecret};
use crate::{auth::IpPattern, config::ProjectInfoCacheOptions, console::AuthSecret};
use super::{Cache, Cached};
@@ -42,56 +42,10 @@ impl<T> From<T> for Entry<T> {
}
}
#[derive(Default)]
struct EndpointInfo {
secret: std::collections::HashMap<SmolStr, Entry<AuthSecret>>,
allowed_ips: Option<Entry<Arc<Vec<SmolStr>>>>,
}
impl EndpointInfo {
fn check_ignore_cache(ignore_cache_since: Option<Instant>, created_at: Instant) -> bool {
match ignore_cache_since {
None => false,
Some(t) => t < created_at,
}
}
pub fn get_role_secret(
&self,
role_name: &SmolStr,
valid_since: Instant,
ignore_cache_since: Option<Instant>,
) -> Option<(AuthSecret, bool)> {
if let Some(secret) = self.secret.get(role_name) {
if valid_since < secret.created_at {
return Some((
secret.value.clone(),
Self::check_ignore_cache(ignore_cache_since, secret.created_at),
));
}
}
None
}
pub fn get_allowed_ips(
&self,
valid_since: Instant,
ignore_cache_since: Option<Instant>,
) -> Option<(Arc<Vec<SmolStr>>, bool)> {
if let Some(allowed_ips) = &self.allowed_ips {
if valid_since < allowed_ips.created_at {
return Some((
allowed_ips.value.clone(),
Self::check_ignore_cache(ignore_cache_since, allowed_ips.created_at),
));
}
}
None
}
pub fn invalidate_allowed_ips(&mut self) {
self.allowed_ips = None;
}
pub fn invalidate_role_secret(&mut self, role_name: &SmolStr) {
self.secret.remove(role_name);
fn check_ignore_cache(ignore_cache_since: Option<Instant>, created_at: Instant) -> bool {
match ignore_cache_since {
None => false,
Some(t) => t < created_at,
}
}
@@ -103,12 +57,33 @@ impl EndpointInfo {
/// One may ask, why the data is stored per project, when on the user request there is only data about the endpoint available?
/// On the cplane side updates are done per project (or per branch), so it's easier to invalidate the whole project cache.
pub struct ProjectInfoCacheImpl {
cache: DashMap<SmolStr, EndpointInfo>,
ip_cache: Mutex<LruCache<SmolStr, Entry<Arc<Vec<IpPattern>>>>>,
role_cache: Mutex<LruCache<(SmolStr, SmolStr), Entry<AuthSecret>>>,
project2ep: DashMap<SmolStr, HashSet<SmolStr>>,
config: ProjectInfoCacheOptions,
// endpoints per project:
// P90: 1
// P99: 2
// P995: 3
// P999: 10
// P9999: 186
//
// Assuming 1 million projects with this distribution:
// (0.9 * 1 + 0.09 * 2 + 0.005 * 3 + 0.004 * 10 + 0.0009 * 186) * 1,000,000
// =~ 1,500,000 endpoints
//
// 1,000,000 * size_of(SmolStr) = 24MB
// 1,500,000 * size_of(SmolStr) = 36MB
// SmallVec inline overhead: 8B * 0.9 * 1,000,000 = 7.2MB
// SmallVec outline overhead: 32B * 0.1 * 1,000,000 = 3.2MB
//
// Total size: 70.4MB.
//
// We do not need to prune this hashmap and can safely
// keep it in memory up until 100s of millions of projects
project2ep: DashMap<SmolStr, SmallVec<[SmolStr; 1]>>,
start_time: Instant,
ttl: Duration,
ttl_disabled_since_us: AtomicU64,
}
@@ -121,9 +96,7 @@ impl ProjectInfoCache for ProjectInfoCacheImpl {
.map(|kv| kv.value().clone())
.unwrap_or_default();
for endpoint_id in endpoints {
if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) {
endpoint_info.invalidate_allowed_ips();
}
self.ip_cache.lock().remove(&endpoint_id);
}
}
fn invalidate_role_secret_for_project(&self, project_id: &SmolStr, role_name: &SmolStr) {
@@ -137,9 +110,9 @@ impl ProjectInfoCache for ProjectInfoCacheImpl {
.map(|kv| kv.value().clone())
.unwrap_or_default();
for endpoint_id in endpoints {
if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) {
endpoint_info.invalidate_role_secret(role_name);
}
self.role_cache
.lock()
.remove(&(endpoint_id, role_name.clone()));
}
}
fn enable_ttl(&self) {
@@ -148,7 +121,7 @@ impl ProjectInfoCache for ProjectInfoCacheImpl {
}
fn disable_ttl(&self) {
let new_ttl = (self.start_time.elapsed() + self.config.ttl).as_micros() as u64;
let new_ttl = (self.start_time.elapsed() + self.ttl).as_micros() as u64;
self.ttl_disabled_since_us
.store(new_ttl, std::sync::atomic::Ordering::Relaxed);
}
@@ -157,9 +130,10 @@ impl ProjectInfoCache for ProjectInfoCacheImpl {
impl ProjectInfoCacheImpl {
pub fn new(config: ProjectInfoCacheOptions) -> Self {
Self {
cache: DashMap::new(),
ip_cache: Mutex::new(LruCache::new(config.size)),
role_cache: Mutex::new(LruCache::new(config.size * config.max_roles)),
project2ep: DashMap::new(),
config,
ttl: config.ttl,
ttl_disabled_since_us: AtomicU64::new(u64::MAX),
start_time: Instant::now(),
}
@@ -171,9 +145,17 @@ impl ProjectInfoCacheImpl {
role_name: &SmolStr,
) -> Option<Cached<&Self, AuthSecret>> {
let (valid_since, ignore_cache_since) = self.get_cache_times();
let endpoint_info = self.cache.get(endpoint_id)?;
let (value, ignore_cache) =
endpoint_info.get_role_secret(role_name, valid_since, ignore_cache_since)?;
let (value, ignore_cache) = {
let mut cache = self.role_cache.lock();
let secret = cache.get(&(endpoint_id.clone(), role_name.clone()))?;
if secret.created_at <= valid_since {
return None;
}
(
secret.value.clone(),
check_ignore_cache(ignore_cache_since, secret.created_at),
)
};
if !ignore_cache {
let cached = Cached {
token: Some((
@@ -186,14 +168,23 @@ impl ProjectInfoCacheImpl {
}
Some(Cached::new_uncached(value))
}
pub fn get_allowed_ips(
&self,
endpoint_id: &SmolStr,
) -> Option<Cached<&Self, Arc<Vec<SmolStr>>>> {
) -> Option<Cached<&Self, Arc<Vec<IpPattern>>>> {
let (valid_since, ignore_cache_since) = self.get_cache_times();
let endpoint_info = self.cache.get(endpoint_id)?;
let value = endpoint_info.get_allowed_ips(valid_since, ignore_cache_since);
let (value, ignore_cache) = value?;
let (value, ignore_cache) = {
let mut cache = self.ip_cache.lock();
let allowed_ips = cache.get(endpoint_id)?;
if allowed_ips.created_at <= valid_since {
return None;
}
(
allowed_ips.value.clone(),
check_ignore_cache(ignore_cache_since, allowed_ips.created_at),
)
};
if !ignore_cache {
let cached = Cached {
token: Some((self, CachedLookupInfo::new_allowed_ips(endpoint_id.clone()))),
@@ -203,6 +194,7 @@ impl ProjectInfoCacheImpl {
}
Some(Cached::new_uncached(value))
}
pub fn insert_role_secret(
&self,
project_id: &SmolStr,
@@ -210,42 +202,33 @@ impl ProjectInfoCacheImpl {
role_name: &SmolStr,
secret: AuthSecret,
) {
if self.cache.len() >= self.config.size {
// If there are too many entries, wait until the next gc cycle.
return;
}
self.inser_project2endpoint(project_id, endpoint_id);
let mut entry = self.cache.entry(endpoint_id.clone()).or_default();
if entry.secret.len() < self.config.max_roles {
entry.secret.insert(role_name.clone(), secret.into());
}
self.insert_project2endpoint(project_id, endpoint_id);
self.role_cache
.lock()
.insert((endpoint_id.clone(), role_name.clone()), secret.into());
}
pub fn insert_allowed_ips(
&self,
project_id: &SmolStr,
endpoint_id: &SmolStr,
allowed_ips: Arc<Vec<SmolStr>>,
allowed_ips: Arc<Vec<IpPattern>>,
) {
if self.cache.len() >= self.config.size {
// If there are too many entries, wait until the next gc cycle.
return;
}
self.inser_project2endpoint(project_id, endpoint_id);
self.cache
.entry(endpoint_id.clone())
self.insert_project2endpoint(project_id, endpoint_id);
self.ip_cache
.lock()
.insert(endpoint_id.clone(), allowed_ips.into());
}
fn insert_project2endpoint(&self, project_id: &SmolStr, endpoint_id: &SmolStr) {
self.project2ep
.entry(project_id.clone())
.or_default()
.allowed_ips = Some(allowed_ips.into());
}
fn inser_project2endpoint(&self, project_id: &SmolStr, endpoint_id: &SmolStr) {
if let Some(mut endpoints) = self.project2ep.get_mut(project_id) {
endpoints.insert(endpoint_id.clone());
} else {
self.project2ep
.insert(project_id.clone(), HashSet::from([endpoint_id.clone()]));
}
.push(endpoint_id.clone());
}
fn get_cache_times(&self) -> (Instant, Option<Instant>) {
let mut valid_since = Instant::now() - self.config.ttl;
let mut valid_since = Instant::now() - self.ttl;
// Only ignore cache if ttl is disabled.
let ttl_disabled_since_us = self
.ttl_disabled_since_us
@@ -260,37 +243,6 @@ impl ProjectInfoCacheImpl {
};
(valid_since, ignore_cache_since)
}
pub async fn gc_worker(&self) -> anyhow::Result<Infallible> {
let mut interval =
tokio::time::interval(self.config.gc_interval / (self.cache.shards().len()) as u32);
loop {
interval.tick().await;
if self.cache.len() < self.config.size {
// If there are not too many entries, wait until the next gc cycle.
continue;
}
self.gc();
}
}
fn gc(&self) {
let shard = thread_rng().gen_range(0..self.project2ep.shards().len());
debug!(shard, "project_info_cache: performing epoch reclamation");
// acquire a random shard lock
let mut removed = 0;
let shard = self.project2ep.shards()[shard].write();
for (_, endpoints) in shard.iter() {
for endpoint in endpoints.get().iter() {
self.cache.remove(endpoint);
removed += 1;
}
}
// We can drop this shard only after making sure that all endpoints are removed.
drop(shard);
info!("project_info_cache: removed {removed} endpoints");
}
}
/// Lookup info for project info cache.
@@ -331,14 +283,12 @@ impl Cache for ProjectInfoCacheImpl {
fn invalidate(&self, key: &Self::LookupInfo<SmolStr>) {
match &key.lookup_type {
LookupType::RoleSecret(role_name) => {
if let Some(mut endpoint_info) = self.cache.get_mut(&key.endpoint_id) {
endpoint_info.invalidate_role_secret(role_name);
}
self.role_cache
.lock()
.remove(&(key.endpoint_id.clone(), role_name.clone()));
}
LookupType::AllowedIps => {
if let Some(mut endpoint_info) = self.cache.get_mut(&key.endpoint_id) {
endpoint_info.invalidate_allowed_ips();
}
self.ip_cache.lock().remove(&key.endpoint_id);
}
}
}
@@ -356,9 +306,8 @@ mod tests {
tokio::time::pause();
let cache = ProjectInfoCacheImpl::new(ProjectInfoCacheOptions {
size: 2,
max_roles: 2,
max_roles: 1,
ttl: Duration::from_secs(1),
gc_interval: Duration::from_secs(600),
});
let project_id = "project".into();
let endpoint_id = "endpoint".into();
@@ -366,7 +315,10 @@ mod tests {
let user2: SmolStr = "user2".into();
let secret1 = AuthSecret::Scram(ServerSecret::mock(user1.as_str(), [1; 32]));
let secret2 = AuthSecret::Scram(ServerSecret::mock(user2.as_str(), [2; 32]));
let allowed_ips = Arc::new(vec!["allowed_ip1".into(), "allowed_ip2".into()]);
let allowed_ips = Arc::new(vec![
"127.0.0.1".parse().unwrap(),
"127.0.0.2".parse().unwrap(),
]);
cache.insert_role_secret(&project_id, &endpoint_id, &user1, secret1.clone());
cache.insert_role_secret(&project_id, &endpoint_id, &user2, secret2.clone());
cache.insert_allowed_ips(&project_id, &endpoint_id, allowed_ips.clone());
@@ -382,7 +334,7 @@ mod tests {
let user3: SmolStr = "user3".into();
let secret3 = AuthSecret::Scram(ServerSecret::mock(user3.as_str(), [3; 32]));
cache.insert_role_secret(&project_id, &endpoint_id, &user3, secret3.clone());
assert!(cache.get_role_secret(&endpoint_id, &user3).is_none());
assert!(cache.get_role_secret(&endpoint_id, &user1).is_none(),);
let cached = cache.get_allowed_ips(&endpoint_id).unwrap();
assert!(cached.cached());
@@ -404,7 +356,6 @@ mod tests {
size: 2,
max_roles: 2,
ttl: Duration::from_secs(1),
gc_interval: Duration::from_secs(600),
}));
cache.clone().disable_ttl();
tokio::time::advance(Duration::from_secs(2)).await;
@@ -415,7 +366,10 @@ mod tests {
let user2: SmolStr = "user2".into();
let secret1 = AuthSecret::Scram(ServerSecret::mock(user1.as_str(), [1; 32]));
let secret2 = AuthSecret::Scram(ServerSecret::mock(user2.as_str(), [2; 32]));
let allowed_ips = Arc::new(vec!["allowed_ip1".into(), "allowed_ip2".into()]);
let allowed_ips = Arc::new(vec![
"127.0.0.1".parse().unwrap(),
"127.0.0.2".parse().unwrap(),
]);
cache.insert_role_secret(&project_id, &endpoint_id, &user1, secret1.clone());
cache.insert_role_secret(&project_id, &endpoint_id, &user2, secret2.clone());
cache.insert_allowed_ips(&project_id, &endpoint_id, allowed_ips.clone());
@@ -452,7 +406,6 @@ mod tests {
size: 2,
max_roles: 2,
ttl: Duration::from_secs(1),
gc_interval: Duration::from_secs(600),
}));
let project_id = "project".into();
@@ -461,7 +414,10 @@ mod tests {
let user2: SmolStr = "user2".into();
let secret1 = AuthSecret::Scram(ServerSecret::mock(user1.as_str(), [1; 32]));
let secret2 = AuthSecret::Scram(ServerSecret::mock(user2.as_str(), [2; 32]));
let allowed_ips = Arc::new(vec!["allowed_ip1".into(), "allowed_ip2".into()]);
let allowed_ips = Arc::new(vec![
"127.0.0.1".parse().unwrap(),
"127.0.0.2".parse().unwrap(),
]);
cache.insert_role_secret(&project_id, &endpoint_id, &user1, secret1.clone());
cache.clone().disable_ttl();
tokio::time::advance(Duration::from_millis(100)).await;

View File

@@ -361,14 +361,11 @@ pub struct ProjectInfoCacheOptions {
pub ttl: Duration,
/// Max number of roles per endpoint.
pub max_roles: usize,
/// Gc interval.
pub gc_interval: Duration,
}
impl ProjectInfoCacheOptions {
/// Default options for [`crate::console::provider::NodeInfoCache`].
pub const CACHE_DEFAULT_OPTIONS: &'static str =
"size=10000,ttl=4m,max_roles=10,gc_interval=60m";
pub const CACHE_DEFAULT_OPTIONS: &'static str = "size=10000,ttl=4m,max_roles=5,gc_interval=60m";
/// Parse cache options passed via cmdline.
/// Example: [`Self::CACHE_DEFAULT_OPTIONS`].
@@ -376,7 +373,7 @@ impl ProjectInfoCacheOptions {
let mut size = None;
let mut ttl = None;
let mut max_roles = None;
let mut gc_interval = None;
let mut _gc_interval = None;
for option in options.split(',') {
let (key, value) = option
@@ -387,7 +384,7 @@ impl ProjectInfoCacheOptions {
"size" => size = Some(value.parse()?),
"ttl" => ttl = Some(humantime::parse_duration(value)?),
"max_roles" => max_roles = Some(value.parse()?),
"gc_interval" => gc_interval = Some(humantime::parse_duration(value)?),
"gc_interval" => _gc_interval = Some(humantime::parse_duration(value)?),
unknown => bail!("unknown key: {unknown}"),
}
}
@@ -401,7 +398,6 @@ impl ProjectInfoCacheOptions {
size: size.context("missing `size`")?,
ttl: ttl.context("missing `ttl`")?,
max_roles: max_roles.context("missing `max_roles`")?,
gc_interval: gc_interval.context("missing `gc_interval`")?,
})
}
}

View File

@@ -2,6 +2,8 @@ use serde::Deserialize;
use smol_str::SmolStr;
use std::fmt;
use crate::auth::IpPattern;
/// Generic error response with human-readable description.
/// Note that we can't always present it to user as is.
#[derive(Debug, Deserialize)]
@@ -14,7 +16,7 @@ pub struct ConsoleError {
#[derive(Deserialize)]
pub struct GetRoleSecret {
pub role_secret: Box<str>,
pub allowed_ips: Option<Vec<Box<str>>>,
pub allowed_ips: Option<Vec<IpPattern>>,
pub project_id: Option<Box<str>>,
}

View File

@@ -4,7 +4,7 @@ pub mod neon;
use super::messages::MetricsAuxInfo;
use crate::{
auth::backend::ComputeUserInfo,
auth::{backend::ComputeUserInfo, IpPattern},
cache::{project_info::ProjectInfoCacheImpl, Cached, TimedLru},
compute,
config::{CacheOptions, ProjectInfoCacheOptions},
@@ -212,7 +212,7 @@ pub enum AuthSecret {
pub struct AuthInfo {
pub secret: Option<AuthSecret>,
/// List of IP addresses allowed for the autorization.
pub allowed_ips: Vec<SmolStr>,
pub allowed_ips: Vec<IpPattern>,
/// Project ID. This is used for cache invalidation.
pub project_id: Option<SmolStr>,
}
@@ -236,7 +236,7 @@ pub struct NodeInfo {
pub type NodeInfoCache = TimedLru<SmolStr, NodeInfo>;
pub type CachedNodeInfo = Cached<&'static NodeInfoCache>;
pub type CachedRoleSecret = Cached<&'static ProjectInfoCacheImpl, AuthSecret>;
pub type CachedAllowedIps = Cached<&'static ProjectInfoCacheImpl, Arc<Vec<SmolStr>>>;
pub type CachedAllowedIps = Cached<&'static ProjectInfoCacheImpl, Arc<Vec<IpPattern>>>;
/// This will allocate per each call, but the http requests alone
/// already require a few allocations, so it should be fine.

View File

@@ -4,14 +4,13 @@ use super::{
errors::{ApiError, GetAuthInfoError, WakeComputeError},
AuthInfo, AuthSecret, CachedNodeInfo, NodeInfo,
};
use crate::cache::Cached;
use crate::console::provider::{CachedAllowedIps, CachedRoleSecret};
use crate::context::RequestMonitoring;
use crate::{auth::backend::ComputeUserInfo, compute, error::io_error, scram, url::ApiUrl};
use crate::{auth::IpPattern, cache::Cached};
use async_trait::async_trait;
use futures::TryFutureExt;
use smol_str::SmolStr;
use std::sync::Arc;
use std::{str::FromStr, sync::Arc};
use thiserror::Error;
use tokio_postgres::{config::SslMode, Client};
use tracing::{error, info, info_span, warn, Instrument};
@@ -88,7 +87,9 @@ impl Api {
{
Some(s) => {
info!("got allowed_ips: {s}");
s.split(',').map(String::from).collect()
s.split(',')
.map(|s| IpPattern::from_str(s).unwrap())
.collect()
}
None => vec![],
};
@@ -100,7 +101,7 @@ impl Api {
.await?;
Ok(AuthInfo {
secret,
allowed_ips: allowed_ips.iter().map(SmolStr::from).collect(),
allowed_ips,
project_id: None,
})
}

View File

@@ -14,7 +14,6 @@ use crate::{
};
use async_trait::async_trait;
use futures::TryFutureExt;
use itertools::Itertools;
use smol_str::SmolStr;
use std::sync::Arc;
use tokio::time::Instant;
@@ -89,12 +88,7 @@ impl Api {
let secret = scram::ServerSecret::parse(&body.role_secret)
.map(AuthSecret::Scram)
.ok_or(GetAuthInfoError::BadSecret)?;
let allowed_ips = body
.allowed_ips
.into_iter()
.flatten()
.map(SmolStr::from)
.collect_vec();
let allowed_ips = body.allowed_ips.unwrap_or_default();
ALLOWED_IPS_NUMBER.observe(allowed_ips.len() as f64);
Ok(AuthInfo {
secret: Some(secret),
@@ -195,6 +189,7 @@ impl super::Api for Api {
Ok(auth_info.secret.map(Cached::new_uncached))
}
#[tracing::instrument(skip_all)]
async fn get_allowed_ips(
&self,
ctx: &mut RequestMonitoring,

View File

@@ -6,13 +6,13 @@ use super::connect_compute::ConnectMechanism;
use super::retry::ShouldRetry;
use super::*;
use crate::auth::backend::{ComputeUserInfo, TestBackend};
use crate::auth::IpPattern;
use crate::config::CertResolver;
use crate::console::{self, CachedNodeInfo, NodeInfo};
use crate::proxy::retry::{retry_after, NUM_RETRIES_CONNECT};
use crate::{auth, http, sasl, scram};
use async_trait::async_trait;
use rstest::rstest;
use smol_str::SmolStr;
use tokio_postgres::config::SslMode;
use tokio_postgres::tls::{MakeTlsConnect, NoTls};
use tokio_postgres_rustls::{MakeRustlsConnect, RustlsStream};
@@ -471,7 +471,7 @@ impl TestBackend for TestConnectMechanism {
}
}
fn get_allowed_ips(&self) -> Result<Vec<SmolStr>, console::errors::GetAuthInfoError> {
fn get_allowed_ips(&self) -> Result<Vec<IpPattern>, console::errors::GetAuthInfoError> {
unimplemented!("not used in tests")
}
}

View File

@@ -110,7 +110,7 @@ pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_backed_up_segments_total",
"Number of WAL segments backed up to the S3"
"Number of WAL segments backed up to the broker"
)
.expect("Failed to register safekeeper_backed_up_segments_total counter")
});
@@ -337,7 +337,6 @@ pub struct TimelineCollector {
flushed_wal_seconds: GaugeVec,
collect_timeline_metrics: Gauge,
timelines_count: IntGauge,
active_timelines_count: IntGauge,
}
impl Default for TimelineCollector {
@@ -521,13 +520,6 @@ impl TimelineCollector {
.unwrap();
descs.extend(timelines_count.desc().into_iter().cloned());
let active_timelines_count = IntGauge::new(
"safekeeper_active_timelines",
"Total number of active timelines",
)
.unwrap();
descs.extend(active_timelines_count.desc().into_iter().cloned());
TimelineCollector {
descs,
commit_lsn,
@@ -548,7 +540,6 @@ impl TimelineCollector {
flushed_wal_seconds,
collect_timeline_metrics,
timelines_count,
active_timelines_count,
}
}
}
@@ -581,7 +572,6 @@ impl Collector for TimelineCollector {
let timelines = GlobalTimelines::get_all();
let timelines_count = timelines.len();
let mut active_timelines_count = 0;
// Prometheus Collector is sync, and data is stored under async lock. To
// bridge the gap with a crutch, collect data in spawned thread with
@@ -600,10 +590,6 @@ impl Collector for TimelineCollector {
let timeline_id = tli.ttid.timeline_id.to_string();
let labels = &[tenant_id.as_str(), timeline_id.as_str()];
if tli.timeline_is_active {
active_timelines_count += 1;
}
self.commit_lsn
.with_label_values(labels)
.set(tli.mem_state.commit_lsn.into());
@@ -695,8 +681,6 @@ impl Collector for TimelineCollector {
// report total number of timelines
self.timelines_count.set(timelines_count as i64);
self.active_timelines_count
.set(active_timelines_count as i64);
mfs.extend(self.timelines_count.collect());
mfs

View File

@@ -10,15 +10,11 @@ use crate::{GlobalTimelines, SafeKeeperConf};
pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
let wal_removal_interval = Duration::from_millis(5000);
loop {
let now = tokio::time::Instant::now();
let mut active_timelines = 0;
let tlis = GlobalTimelines::get_all();
for tli in &tlis {
if !tli.is_active().await {
continue;
}
active_timelines += 1;
let ttid = tli.ttid;
async {
if let Err(e) = tli.maybe_persist_control_file().await {
@@ -31,17 +27,6 @@ pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
.instrument(info_span!("WAL removal", ttid = %ttid))
.await;
}
let elapsed = now.elapsed();
let total_timelines = tlis.len();
if elapsed > wal_removal_interval {
info!(
"WAL removal is too long, processed {} active timelines ({} total) in {:?}",
active_timelines, total_timelines, elapsed
);
}
sleep(wal_removal_interval).await;
}
}

View File

@@ -2914,7 +2914,6 @@ class Endpoint(PgProtocol):
# Write it back updated
with open(config_path, "w") as file:
log.info(json.dumps(dict(data_dict, **kwargs)))
json.dump(dict(data_dict, **kwargs), file, indent=4)
# Mock the extension part of spec passed from control plane for local testing

View File

@@ -248,15 +248,8 @@ def test_ddl_forwarding(ddl: DdlForwardingContext):
# We don't have compute_ctl, so here, so create neon_superuser here manually
cur.execute("CREATE ROLE neon_superuser NOLOGIN CREATEDB CREATEROLE")
# Contrary to popular belief, being superman does not make you superuser
cur.execute("CREATE ROLE superman LOGIN NOSUPERUSER PASSWORD 'jungle_man'")
with ddl.pg.cursor(user="superman", password="jungle_man") as superman_cur:
# We allow real SUPERUSERs to ALTER neon_superuser
with pytest.raises(psycopg2.InternalError):
superman_cur.execute("ALTER ROLE neon_superuser LOGIN")
cur.execute("ALTER ROLE neon_superuser LOGIN")
with pytest.raises(psycopg2.InternalError):
cur.execute("ALTER ROLE neon_superuser LOGIN")
with pytest.raises(psycopg2.InternalError):
cur.execute("CREATE DATABASE trololobus WITH OWNER neon_superuser")

View File

@@ -1,37 +0,0 @@
import time
from fixtures.neon_fixtures import NeonEnv
def test_migrations(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_migrations", "empty")
endpoint = env.endpoints.create("test_migrations")
log_path = endpoint.endpoint_path() / "compute.log"
endpoint.respec(skip_pg_catalog_updates=False, features=["migrations"])
endpoint.start()
time.sleep(1) # Sleep to let migrations run
with endpoint.cursor() as cur:
cur.execute("SELECT id FROM neon_migration.migration_id")
migration_id = cur.fetchall()
assert migration_id[0][0] == 2
with open(log_path, "r") as log_file:
logs = log_file.read()
assert "INFO handle_migrations: Ran 2 migrations" in logs
endpoint.stop()
endpoint.start()
time.sleep(1) # Sleep to let migrations run
with endpoint.cursor() as cur:
cur.execute("SELECT id FROM neon_migration.migration_id")
migration_id = cur.fetchall()
assert migration_id[0][0] == 2
with open(log_path, "r") as log_file:
logs = log_file.read()
assert "INFO handle_migrations: Ran 0 migrations" in logs

View File

@@ -1,42 +0,0 @@
import threading
import time
from contextlib import closing
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin
# Test updating neon.pageserver_connstring setting on the fly.
#
# This merely changes some whitespace in the connection string, so
# this doesn't prove that the new string actually takes effect. But at
# least the code gets exercised.
def test_pageserver_reconnect(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
env.neon_cli.create_branch("test_pageserver_restarts")
endpoint = env.endpoints.create_start("test_pageserver_restarts")
n_reconnects = 1000
timeout = 0.01
scale = 10
def run_pgbench(connstr: str):
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
pg_bin.run_capture(["pgbench", f"-T{int(n_reconnects*timeout)}", connstr])
thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True)
thread.start()
with closing(endpoint.connect()) as con:
with con.cursor() as c:
c.execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")
connstring = c.fetchall()[0][0]
for i in range(n_reconnects):
time.sleep(timeout)
c.execute(
"alter system set neon.pageserver_connstring=%s",
(connstring + (" " * (i % 2)),),
)
c.execute("select pg_reload_conf()")
thread.join()

View File

@@ -66,7 +66,7 @@ rustls = { version = "0.21", features = ["dangerous_configuration"] }
scopeguard = { version = "1" }
serde = { version = "1", features = ["alloc", "derive"] }
serde_json = { version = "1", features = ["raw_value"] }
smallvec = { version = "1", default-features = false, features = ["write"] }
smallvec = { version = "1", default-features = false, features = ["serde", "write"] }
subtle = { version = "2" }
time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }