Compare commits

..

2 Commits

Author SHA1 Message Date
Conrad Ludgate
abdc9bb4ba hakari 2023-11-16 09:41:04 +01:00
Conrad Ludgate
eac2e7498c start transition 2023-11-15 22:41:19 +01:00
21 changed files with 352 additions and 1019 deletions

View File

@@ -852,7 +852,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.19.0
VM_BUILDER_VERSION: v0.18.5
steps:
- name: Checkout
@@ -874,7 +874,8 @@ jobs:
- name: Build vm image
run: |
./vm-builder \
-spec=vm-image-spec.yaml \
-enable-file-cache \
-cgroup-uid=postgres \
-src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} \
-dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}

2
Cargo.lock generated
View File

@@ -2994,12 +2994,10 @@ name = "pageserver_api"
version = "0.1.0"
dependencies = [
"anyhow",
"bincode",
"byteorder",
"bytes",
"const_format",
"enum-map",
"hex",
"postgres_ffi",
"serde",
"serde_json",

View File

@@ -86,7 +86,7 @@ hostname = "0.3.1"
http-types = { version = "2", default-features = false }
humantime = "2.1"
humantime-serde = "1.1.1"
hyper = "0.14"
hyper = { version = "0.14", features=["backports"] }
hyper-tungstenite = "0.11"
inotify = "0.10.2"
itertools = "0.10"

View File

@@ -714,23 +714,6 @@ RUN wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.3.tar.gz -
cargo pgrx install --release && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/ulid.control
#########################################################################################
#
# Layer "pg-wait-sampling-pg-build"
# compile pg_wait_sampling extension
#
#########################################################################################
FROM build-deps AS pg-wait-sampling-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH "/usr/local/pgsql/bin/:$PATH"
RUN wget https://github.com/postgrespro/pg_wait_sampling/archive/refs/tags/v1.1.5.tar.gz -O pg_wait_sampling.tar.gz && \
echo 'a03da6a413f5652ce470a3635ed6ebba528c74cb26aa4cfced8aff8a8441f81ec6dd657ff62cd6ce96a4e6ce02cad9f2519ae9525367ece60497aa20faafde5c pg_wait_sampling.tar.gz' | sha512sum -c && \
mkdir pg_wait_sampling-src && cd pg_wait_sampling-src && tar xvzf ../pg_wait_sampling.tar.gz --strip-components=1 -C . && \
make USE_PGXS=1 -j $(getconf _NPROCESSORS_ONLN) && \
make USE_PGXS=1 -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_wait_sampling.control
#########################################################################################
#
# Layer "neon-pg-ext-build"
@@ -767,7 +750,6 @@ COPY --from=rdkit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-uuidv7-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-roaringbitmap-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-embedding-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-wait-sampling-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \

View File

@@ -18,7 +18,6 @@ use camino::Utf8PathBuf;
use pageserver_api::models::{
self, LocationConfig, TenantInfo, TenantLocationConfigRequest, TimelineInfo,
};
use pageserver_api::shard::TenantShardId;
use postgres_backend::AuthType;
use postgres_connection::{parse_host_port, PgConnectionConfig};
use reqwest::blocking::{Client, RequestBuilder, Response};
@@ -409,7 +408,7 @@ impl PageServerNode {
};
let request = models::TenantCreateRequest {
new_tenant_id: TenantShardId::unsharded(new_tenant_id),
new_tenant_id,
generation,
config,
};

View File

@@ -17,9 +17,5 @@ postgres_ffi.workspace = true
enum-map.workspace = true
strum.workspace = true
strum_macros.workspace = true
hex.workspace = true
workspace_hack.workspace = true
[dev-dependencies]
bincode.workspace = true

View File

@@ -1,142 +0,0 @@
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
use serde::{Deserialize, Serialize};
use std::fmt;
/// Key used in the Repository kv-store.
///
/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs
/// for what we actually store in these fields.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct Key {
pub field1: u8,
pub field2: u32,
pub field3: u32,
pub field4: u32,
pub field5: u8,
pub field6: u32,
}
pub const KEY_SIZE: usize = 18;
impl Key {
/// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish.
/// As long as Neon does not support tablespace (because of lack of access to local file system),
/// we can assume that only some predefined namespace OIDs are used which can fit in u16
pub fn to_i128(&self) -> i128 {
assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222);
(((self.field1 & 0xf) as i128) << 120)
| (((self.field2 & 0xFFFF) as i128) << 104)
| ((self.field3 as i128) << 72)
| ((self.field4 as i128) << 40)
| ((self.field5 as i128) << 32)
| self.field6 as i128
}
pub const fn from_i128(x: i128) -> Self {
Key {
field1: ((x >> 120) & 0xf) as u8,
field2: ((x >> 104) & 0xFFFF) as u32,
field3: (x >> 72) as u32,
field4: (x >> 40) as u32,
field5: (x >> 32) as u8,
field6: x as u32,
}
}
pub fn next(&self) -> Key {
self.add(1)
}
pub fn add(&self, x: u32) -> Key {
let mut key = *self;
let r = key.field6.overflowing_add(x);
key.field6 = r.0;
if r.1 {
let r = key.field5.overflowing_add(1);
key.field5 = r.0;
if r.1 {
let r = key.field4.overflowing_add(1);
key.field4 = r.0;
if r.1 {
let r = key.field3.overflowing_add(1);
key.field3 = r.0;
if r.1 {
let r = key.field2.overflowing_add(1);
key.field2 = r.0;
if r.1 {
let r = key.field1.overflowing_add(1);
key.field1 = r.0;
assert!(!r.1);
}
}
}
}
}
key
}
pub fn from_slice(b: &[u8]) -> Self {
Key {
field1: b[0],
field2: u32::from_be_bytes(b[1..5].try_into().unwrap()),
field3: u32::from_be_bytes(b[5..9].try_into().unwrap()),
field4: u32::from_be_bytes(b[9..13].try_into().unwrap()),
field5: b[13],
field6: u32::from_be_bytes(b[14..18].try_into().unwrap()),
}
}
pub fn write_to_byte_slice(&self, buf: &mut [u8]) {
buf[0] = self.field1;
BE::write_u32(&mut buf[1..5], self.field2);
BE::write_u32(&mut buf[5..9], self.field3);
BE::write_u32(&mut buf[9..13], self.field4);
buf[13] = self.field5;
BE::write_u32(&mut buf[14..18], self.field6);
}
}
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}",
self.field1, self.field2, self.field3, self.field4, self.field5, self.field6
)
}
}
impl Key {
pub const MIN: Key = Key {
field1: u8::MIN,
field2: u32::MIN,
field3: u32::MIN,
field4: u32::MIN,
field5: u8::MIN,
field6: u32::MIN,
};
pub const MAX: Key = Key {
field1: u8::MAX,
field2: u32::MAX,
field3: u32::MAX,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX,
};
pub fn from_hex(s: &str) -> Result<Self> {
if s.len() != 36 {
bail!("parse error");
}
Ok(Key {
field1: u8::from_str_radix(&s[0..2], 16)?,
field2: u32::from_str_radix(&s[2..10], 16)?,
field3: u32::from_str_radix(&s[10..18], 16)?,
field4: u32::from_str_radix(&s[18..26], 16)?,
field5: u8::from_str_radix(&s[26..28], 16)?,
field6: u32::from_str_radix(&s[28..36], 16)?,
})
}
}

View File

@@ -4,10 +4,8 @@ use const_format::formatcp;
/// Public API types
pub mod control_api;
pub mod key;
pub mod models;
pub mod reltag;
pub mod shard;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");

View File

@@ -16,7 +16,7 @@ use utils::{
lsn::Lsn,
};
use crate::{reltag::RelTag, shard::TenantShardId};
use crate::reltag::RelTag;
use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
@@ -187,7 +187,7 @@ pub struct TimelineCreateRequest {
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantCreateRequest {
pub new_tenant_id: TenantShardId,
pub new_tenant_id: TenantId,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub generation: Option<u32>,

View File

@@ -1,321 +0,0 @@
use std::{ops::RangeInclusive, str::FromStr};
use hex::FromHex;
use serde::{Deserialize, Serialize};
use utils::id::TenantId;
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
pub struct ShardNumber(pub u8);
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
pub struct ShardCount(pub u8);
impl ShardCount {
pub const MAX: Self = Self(u8::MAX);
}
impl ShardNumber {
pub const MAX: Self = Self(u8::MAX);
}
/// TenantShardId identify the units of work for the Pageserver.
///
/// These are written as `<tenant_id>-<shard number><shard-count>`, for example:
///
/// # The second shard in a two-shard tenant
/// 072f1291a5310026820b2fe4b2968934-0102
///
/// Historically, tenants could not have multiple shards, and were identified
/// by TenantId. To support this, TenantShardId has a special legacy
/// mode where `shard_count` is equal to zero: this represents a single-sharded
/// tenant which should be written as a TenantId with no suffix.
///
/// The human-readable encoding of TenantShardId, such as used in API URLs,
/// is both forward and backward compatible: a legacy TenantId can be
/// decoded as a TenantShardId, and when re-encoded it will be parseable
/// as a TenantId.
///
/// Note that the binary encoding is _not_ backward compatible, because
/// at the time sharding is introduced, there are no existing binary structures
/// containing TenantId that we need to handle.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy)]
pub struct TenantShardId {
pub tenant_id: TenantId,
pub shard_number: ShardNumber,
pub shard_count: ShardCount,
}
impl TenantShardId {
pub fn unsharded(tenant_id: TenantId) -> Self {
Self {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
}
}
/// The range of all TenantShardId that belong to a particular TenantId. This is useful when
/// you have a BTreeMap of TenantShardId, and are querying by TenantId.
pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
RangeInclusive::new(
Self {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
},
Self {
tenant_id,
shard_number: ShardNumber::MAX,
shard_count: ShardCount::MAX,
},
)
}
pub fn shard_slug(&self) -> String {
format!("{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
}
}
impl std::fmt::Display for TenantShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.shard_count != ShardCount(0) {
write!(
f,
"{}-{:02x}{:02x}",
self.tenant_id, self.shard_number.0, self.shard_count.0
)
} else {
// Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
// is distinct from the normal single shard case (shard count == 1).
self.tenant_id.fmt(f)
}
}
}
impl std::fmt::Debug for TenantShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Debug is the same as Display: the compact hex representation
write!(f, "{}", self)
}
}
impl std::str::FromStr for TenantShardId {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
// Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
if s.len() == 32 {
// Legacy case: no shard specified
Ok(Self {
tenant_id: TenantId::from_str(s)?,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
})
} else if s.len() == 37 {
let bytes = s.as_bytes();
let tenant_id = TenantId::from_hex(&bytes[0..32])?;
let mut shard_parts: [u8; 2] = [0u8; 2];
hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
Ok(Self {
tenant_id,
shard_number: ShardNumber(shard_parts[0]),
shard_count: ShardCount(shard_parts[1]),
})
} else {
Err(hex::FromHexError::InvalidStringLength)
}
}
}
impl From<[u8; 18]> for TenantShardId {
fn from(b: [u8; 18]) -> Self {
let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
Self {
tenant_id: TenantId::from(tenant_id_bytes),
shard_number: ShardNumber(b[16]),
shard_count: ShardCount(b[17]),
}
}
}
impl Serialize for TenantShardId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
serializer.collect_str(self)
} else {
let mut packed: [u8; 18] = [0; 18];
packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
packed[16] = self.shard_number.0;
packed[17] = self.shard_count.0;
packed.serialize(serializer)
}
}
}
impl<'de> Deserialize<'de> for TenantShardId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct IdVisitor {
is_human_readable_deserializer: bool,
}
impl<'de> serde::de::Visitor<'de> for IdVisitor {
type Value = TenantShardId;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
if self.is_human_readable_deserializer {
formatter.write_str("value in form of hex string")
} else {
formatter.write_str("value in form of integer array([u8; 18])")
}
}
fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let s = serde::de::value::SeqAccessDeserializer::new(seq);
let id: [u8; 18] = Deserialize::deserialize(s)?;
Ok(TenantShardId::from(id))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
TenantShardId::from_str(v).map_err(E::custom)
}
}
if deserializer.is_human_readable() {
deserializer.deserialize_str(IdVisitor {
is_human_readable_deserializer: true,
})
} else {
deserializer.deserialize_tuple(
18,
IdVisitor {
is_human_readable_deserializer: false,
},
)
}
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use bincode;
use utils::{id::TenantId, Hex};
use super::*;
const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc";
#[test]
fn tenant_shard_id_string() -> Result<(), hex::FromHexError> {
let example = TenantShardId {
tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
shard_count: ShardCount(10),
shard_number: ShardNumber(7),
};
let encoded = format!("{example}");
let expected = format!("{EXAMPLE_TENANT_ID}-070a");
assert_eq!(&encoded, &expected);
let decoded = TenantShardId::from_str(&encoded)?;
assert_eq!(example, decoded);
Ok(())
}
#[test]
fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> {
let example = TenantShardId {
tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
shard_count: ShardCount(10),
shard_number: ShardNumber(7),
};
let encoded = bincode::serialize(&example).unwrap();
let expected: [u8; 18] = [
0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
0xf6, 0xfc, 0x07, 0x0a,
];
assert_eq!(Hex(&encoded), Hex(&expected));
let decoded = bincode::deserialize(&encoded).unwrap();
assert_eq!(example, decoded);
Ok(())
}
#[test]
fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> {
// Test that TenantShardId can decode a TenantId in human
// readable form
let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
let encoded = format!("{example}");
assert_eq!(&encoded, EXAMPLE_TENANT_ID);
let decoded = TenantShardId::from_str(&encoded)?;
assert_eq!(example, decoded.tenant_id);
assert_eq!(decoded.shard_count, ShardCount(0));
assert_eq!(decoded.shard_number, ShardNumber(0));
Ok(())
}
#[test]
fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> {
// Test that a legacy TenantShardId encodes into a form that
// can be decoded as TenantId
let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
let example = TenantShardId::unsharded(example_tenant_id);
let encoded = format!("{example}");
assert_eq!(&encoded, EXAMPLE_TENANT_ID);
let decoded = TenantId::from_str(&encoded)?;
assert_eq!(example_tenant_id, decoded);
Ok(())
}
#[test]
fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> {
// Unlike in human readable encoding, binary encoding does not
// do any special handling of legacy unsharded TenantIds: this test
// is equivalent to the main test for binary encoding, just verifying
// that the same behavior applies when we have used `unsharded()` to
// construct a TenantShardId.
let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap());
let encoded = bincode::serialize(&example).unwrap();
let expected: [u8; 18] = [
0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
0xf6, 0xfc, 0x00, 0x00,
];
assert_eq!(Hex(&encoded), Hex(&expected));
let decoded = bincode::deserialize::<TenantShardId>(&encoded).unwrap();
assert_eq!(example, decoded);
Ok(())
}
}

View File

@@ -16,7 +16,7 @@ aws-sdk-s3.workspace = true
aws-credential-types.workspace = true
bytes.workspace = true
camino.workspace = true
hyper = { workspace = true, features = ["stream"] }
hyper = { workspace = true }
serde.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = ["sync", "fs", "io-util"] }

View File

@@ -1,7 +1,7 @@
//! Tracing wrapper for Hyper HTTP server
use hyper::HeaderMap;
use hyper::{Body, Request, Response};
use hyper::{body::HttpBody, Request, Response};
use std::future::Future;
use tracing::Instrument;
use tracing_opentelemetry::OpenTelemetrySpanExt;
@@ -35,14 +35,14 @@ pub enum OtelName<'a> {
/// instrumentation libraries at:
/// <https://opentelemetry.io/registry/?language=rust&component=instrumentation>
/// If a Hyper crate appears, consider switching to that.
pub async fn tracing_handler<F, R>(
req: Request<Body>,
pub async fn tracing_handler<F, R, B1: HttpBody, B2: HttpBody>(
req: Request<B1>,
handler: F,
otel_name: OtelName<'_>,
) -> Response<Body>
) -> Response<B2>
where
F: Fn(Request<Body>) -> R,
R: Future<Output = Response<Body>>,
F: Fn(Request<B1>) -> R,
R: Future<Output = Response<B2>>,
{
// Create a tracing span, with context propagated from the incoming
// request if any.

View File

@@ -16,7 +16,6 @@ use pageserver_api::models::{
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
TenantLoadRequest, TenantLocationConfigRequest,
};
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use tenant_size_model::{SizeResult, StorageModel};
use tokio_util::sync::CancellationToken;
@@ -420,9 +419,9 @@ async fn timeline_create_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
check_permission(&request, Some(tenant_id))?;
let new_timeline_id = request_data.new_timeline_id;
@@ -431,7 +430,7 @@ async fn timeline_create_handler(
let state = get_state(&request);
async {
let tenant = state.tenant_manager.get_attached_tenant_shard(tenant_shard_id, true)?;
let tenant = mgr::get_tenant(tenant_id, true)?;
match tenant.create_timeline(
new_timeline_id,
request_data.ancestor_timeline_id.map(TimelineId::from),
@@ -465,10 +464,7 @@ async fn timeline_create_handler(
Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
}
}
.instrument(info_span!("timeline_create",
tenant_id = %tenant_shard_id.tenant_id,
shard = %tenant_shard_id.shard_slug(),
timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
.instrument(info_span!("timeline_create", %tenant_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
.await
}
@@ -664,15 +660,14 @@ async fn timeline_delete_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
check_permission(&request, Some(tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let state = get_state(&request);
state.tenant_manager.delete_timeline(tenant_shard_id, timeline_id, &ctx)
.instrument(info_span!("timeline_delete", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug(), %timeline_id))
mgr::delete_timeline(tenant_id, timeline_id, &ctx)
.instrument(info_span!("timeline_delete", %tenant_id, %timeline_id))
.await?;
json_response(StatusCode::ACCEPTED, ())
@@ -686,14 +681,11 @@ async fn tenant_detach_handler(
check_permission(&request, Some(tenant_id))?;
let detach_ignored: Option<bool> = parse_query_param(&request, "detach_ignored")?;
// This is a legacy API (`/location_conf` is the replacement). It only supports unsharded tenants
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let state = get_state(&request);
let conf = state.conf;
mgr::detach_tenant(
conf,
tenant_shard_id,
tenant_id,
detach_ignored.unwrap_or(false),
&state.deletion_queue_client,
)
@@ -810,16 +802,13 @@ async fn tenant_delete_handler(
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
// TODO openapi spec
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let state = get_state(&request);
mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_shard_id)
.instrument(info_span!("tenant_delete_handler",
tenant_id = %tenant_shard_id.tenant_id,
shard = tenant_shard_id.shard_slug()
))
mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_id)
.instrument(info_span!("tenant_delete_handler", %tenant_id))
.await?;
json_response(StatusCode::ACCEPTED, ())
@@ -1149,10 +1138,9 @@ async fn put_tenant_location_config_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let tenant_id = request_data.tenant_id;
check_permission(&request, Some(tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let state = get_state(&request);
@@ -1161,13 +1149,9 @@ async fn put_tenant_location_config_handler(
// The `Detached` state is special, it doesn't upsert a tenant, it removes
// its local disk content and drops it from memory.
if let LocationConfigMode::Detached = request_data.config.mode {
if let Err(e) =
mgr::detach_tenant(conf, tenant_shard_id, true, &state.deletion_queue_client)
.instrument(info_span!("tenant_detach",
tenant_id = %tenant_shard_id.tenant_id,
shard = tenant_shard_id.shard_slug()
))
.await
if let Err(e) = mgr::detach_tenant(conf, tenant_id, true, &state.deletion_queue_client)
.instrument(info_span!("tenant_detach", %tenant_id))
.await
{
match e {
TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
@@ -1184,7 +1168,7 @@ async fn put_tenant_location_config_handler(
state
.tenant_manager
.upsert_location(tenant_shard_id, location_conf, &ctx)
.upsert_location(tenant_id, location_conf, &ctx)
.await
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
// principle we might have hit something like concurrent API calls to the same tenant,
@@ -1768,7 +1752,7 @@ pub fn make_router(
.get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
.post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
.get("/v1/tenant/:tenant_id", |r| api_handler(r, tenant_status))
.delete("/v1/tenant/:tenant_shard_id", |r| {
.delete("/v1/tenant/:tenant_id", |r| {
api_handler(r, tenant_delete_handler)
})
.get("/v1/tenant/:tenant_id/synthetic_size", |r| {
@@ -1780,13 +1764,13 @@ pub fn make_router(
.get("/v1/tenant/:tenant_id/config", |r| {
api_handler(r, get_tenant_config_handler)
})
.put("/v1/tenant/:tenant_shard_id/location_config", |r| {
.put("/v1/tenant/:tenant_id/location_config", |r| {
api_handler(r, put_tenant_location_config_handler)
})
.get("/v1/tenant/:tenant_id/timeline", |r| {
api_handler(r, timeline_list_handler)
})
.post("/v1/tenant/:tenant_shard_id/timeline", |r| {
.post("/v1/tenant/:tenant_id/timeline", |r| {
api_handler(r, timeline_create_handler)
})
.post("/v1/tenant/:tenant_id/attach", |r| {
@@ -1830,7 +1814,7 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
|r| api_handler(r, timeline_download_remote_layers_handler_get),
)
.delete("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| {
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
api_handler(r, timeline_delete_handler)
})
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/layer", |r| {

View File

@@ -1,11 +1,106 @@
use crate::walrecord::NeonWalRecord;
use anyhow::Result;
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::ops::{AddAssign, Range};
use std::time::Duration;
pub use pageserver_api::key::{Key, KEY_SIZE};
/// Key used in the Repository kv-store.
///
/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs
/// for what we actually store in these fields.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct Key {
pub field1: u8,
pub field2: u32,
pub field3: u32,
pub field4: u32,
pub field5: u8,
pub field6: u32,
}
pub const KEY_SIZE: usize = 18;
impl Key {
/// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish.
/// As long as Neon does not support tablespace (because of lack of access to local file system),
/// we can assume that only some predefined namespace OIDs are used which can fit in u16
pub fn to_i128(&self) -> i128 {
assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222);
(((self.field1 & 0xf) as i128) << 120)
| (((self.field2 & 0xFFFF) as i128) << 104)
| ((self.field3 as i128) << 72)
| ((self.field4 as i128) << 40)
| ((self.field5 as i128) << 32)
| self.field6 as i128
}
pub const fn from_i128(x: i128) -> Self {
Key {
field1: ((x >> 120) & 0xf) as u8,
field2: ((x >> 104) & 0xFFFF) as u32,
field3: (x >> 72) as u32,
field4: (x >> 40) as u32,
field5: (x >> 32) as u8,
field6: x as u32,
}
}
pub fn next(&self) -> Key {
self.add(1)
}
pub fn add(&self, x: u32) -> Key {
let mut key = *self;
let r = key.field6.overflowing_add(x);
key.field6 = r.0;
if r.1 {
let r = key.field5.overflowing_add(1);
key.field5 = r.0;
if r.1 {
let r = key.field4.overflowing_add(1);
key.field4 = r.0;
if r.1 {
let r = key.field3.overflowing_add(1);
key.field3 = r.0;
if r.1 {
let r = key.field2.overflowing_add(1);
key.field2 = r.0;
if r.1 {
let r = key.field1.overflowing_add(1);
key.field1 = r.0;
assert!(!r.1);
}
}
}
}
}
key
}
pub fn from_slice(b: &[u8]) -> Self {
Key {
field1: b[0],
field2: u32::from_be_bytes(b[1..5].try_into().unwrap()),
field3: u32::from_be_bytes(b[5..9].try_into().unwrap()),
field4: u32::from_be_bytes(b[9..13].try_into().unwrap()),
field5: b[13],
field6: u32::from_be_bytes(b[14..18].try_into().unwrap()),
}
}
pub fn write_to_byte_slice(&self, buf: &mut [u8]) {
buf[0] = self.field1;
BE::write_u32(&mut buf[1..5], self.field2);
BE::write_u32(&mut buf[5..9], self.field3);
BE::write_u32(&mut buf[9..13], self.field4);
buf[13] = self.field5;
BE::write_u32(&mut buf[14..18], self.field6);
}
}
pub fn key_range_size(key_range: &Range<Key>) -> u32 {
let start = key_range.start;
@@ -34,6 +129,49 @@ pub fn singleton_range(key: Key) -> Range<Key> {
key..key.next()
}
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}",
self.field1, self.field2, self.field3, self.field4, self.field5, self.field6
)
}
}
impl Key {
pub const MIN: Key = Key {
field1: u8::MIN,
field2: u32::MIN,
field3: u32::MIN,
field4: u32::MIN,
field5: u8::MIN,
field6: u32::MIN,
};
pub const MAX: Key = Key {
field1: u8::MAX,
field2: u32::MAX,
field3: u32::MAX,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX,
};
pub fn from_hex(s: &str) -> Result<Self> {
if s.len() != 36 {
bail!("parse error");
}
Ok(Key {
field1: u8::from_str_radix(&s[0..2], 16)?,
field2: u32::from_str_radix(&s[2..10], 16)?,
field3: u32::from_str_radix(&s[10..18], 16)?,
field4: u32::from_str_radix(&s[18..26], 16)?,
field5: u8::from_str_radix(&s[26..28], 16)?,
field6: u32::from_str_radix(&s[28..36], 16)?,
})
}
}
/// A 'value' stored for a one Key.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]

View File

@@ -2,10 +2,9 @@
//! page server.
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
use pageserver_api::shard::TenantShardId;
use rand::{distributions::Alphanumeric, Rng};
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -31,7 +30,6 @@ use crate::metrics::TENANT_MANAGER as METRICS;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::span::debug_assert_current_span_has_tenant_id;
use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
@@ -89,37 +87,10 @@ pub(crate) enum TenantsMap {
Initializing,
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
/// New tenants can be added using [`tenant_map_acquire_slot`].
Open(BTreeMap<TenantShardId, TenantSlot>),
Open(HashMap<TenantId, TenantSlot>),
/// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
/// Existing tenants are still accessible, but no new tenants can be created.
ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
}
/// Helper for mapping shard-unaware functions to a sharding-aware map
/// TODO(sharding): all users of this must be made shard-aware.
fn exactly_one_or_none<'a>(
map: &'a BTreeMap<TenantShardId, TenantSlot>,
tenant_id: &TenantId,
) -> Option<(&'a TenantShardId, &'a TenantSlot)> {
let mut slots = map.range(TenantShardId::tenant_range(*tenant_id));
// Retrieve the first two slots in the range: if both are populated, we must panic because the caller
// needs a shard-naive view of the world in which only one slot can exist for a TenantId at a time.
let slot_a = slots.next();
let slot_b = slots.next();
match (slot_a, slot_b) {
(None, None) => None,
(Some(slot), None) => {
// Exactly one matching slot
Some(slot)
}
(Some(_slot_a), Some(_slot_b)) => {
// Multiple shards for this tenant: cannot handle this yet.
// TODO(sharding): callers of get() should be shard-aware.
todo!("Attaching multiple shards in teh same tenant to the same pageserver")
}
(None, Some(_)) => unreachable!(),
}
ShuttingDown(HashMap<TenantId, TenantSlot>),
}
impl TenantsMap {
@@ -130,8 +101,7 @@ impl TenantsMap {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
// TODO(sharding): callers of get() should be shard-aware.
exactly_one_or_none(m, tenant_id).and_then(|(_, slot)| slot.get_attached())
m.get(tenant_id).and_then(TenantSlot::get_attached)
}
}
}
@@ -139,10 +109,7 @@ impl TenantsMap {
pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option<TenantSlot> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
let key = exactly_one_or_none(m, tenant_id).map(|(k, _)| *k);
key.and_then(|key| m.remove(&key))
}
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
}
}
@@ -416,7 +383,7 @@ pub async fn init_tenant_mgr(
init_order: InitializationOrder,
cancel: CancellationToken,
) -> anyhow::Result<TenantManager> {
let mut tenants = BTreeMap::new();
let mut tenants = HashMap::new();
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
@@ -437,7 +404,7 @@ pub async fn init_tenant_mgr(
warn!(%tenant_id, "Marking tenant broken, failed to {e:#}");
tenants.insert(
TenantShardId::unsharded(tenant_id),
tenant_id,
TenantSlot::Attached(Tenant::create_broken_tenant(
conf,
tenant_id,
@@ -460,7 +427,7 @@ pub async fn init_tenant_mgr(
// tenants, because they do no remote writes and hence require no
// generation number
info!(%tenant_id, "Loaded tenant in secondary mode");
tenants.insert(TenantShardId::unsharded(tenant_id), TenantSlot::Secondary);
tenants.insert(tenant_id, TenantSlot::Secondary);
}
LocationMode::Attached(_) => {
// TODO: augment re-attach API to enable the control plane to
@@ -503,10 +470,7 @@ pub async fn init_tenant_mgr(
&ctx,
) {
Ok(tenant) => {
tenants.insert(
TenantShardId::unsharded(tenant.tenant_id()),
TenantSlot::Attached(tenant),
);
tenants.insert(tenant.tenant_id(), TenantSlot::Attached(tenant));
}
Err(e) => {
error!(%tenant_id, "Failed to start tenant: {e:#}");
@@ -609,19 +573,19 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
let mut m = tenants.write().unwrap();
match &mut *m {
TenantsMap::Initializing => {
*m = TenantsMap::ShuttingDown(BTreeMap::default());
*m = TenantsMap::ShuttingDown(HashMap::default());
info!("tenants map is empty");
return;
}
TenantsMap::Open(tenants) => {
let mut shutdown_state = BTreeMap::new();
let mut shutdown_state = HashMap::new();
let mut total_in_progress = 0;
let mut total_attached = 0;
for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
for (tenant_id, v) in tenants.drain() {
match v {
TenantSlot::Attached(t) => {
shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
shutdown_state.insert(tenant_id, TenantSlot::Attached(t.clone()));
join_set.spawn(
async move {
let freeze_and_flush = true;
@@ -640,13 +604,13 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
// going to log too many lines
debug!("tenant successfully stopped");
}
.instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug())),
.instrument(info_span!("shutdown", %tenant_id)),
);
total_attached += 1;
}
TenantSlot::Secondary => {
shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary);
shutdown_state.insert(tenant_id, TenantSlot::Secondary);
}
TenantSlot::InProgress(notify) => {
// InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
@@ -726,22 +690,19 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
pub(crate) async fn create_tenant(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_shard_id: TenantShardId,
tenant_id: TenantId,
generation: Generation,
resources: TenantSharedResources,
ctx: &RequestContext,
) -> Result<Arc<Tenant>, TenantMapInsertError> {
let location_conf = LocationConf::attached_single(tenant_conf, generation);
let slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
// TODO(sharding): make local paths shard-aware
let tenant_path =
super::create_tenant_files(conf, &location_conf, &tenant_shard_id.tenant_id).await?;
let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?;
let tenant_path = super::create_tenant_files(conf, &location_conf, &tenant_id).await?;
let created_tenant = tenant_spawn(
conf,
tenant_shard_id.tenant_id,
tenant_id,
&tenant_path,
resources,
AttachedTenantConf::try_from(location_conf)?,
@@ -754,7 +715,11 @@ pub(crate) async fn create_tenant(
// See https://github.com/neondatabase/neon/issues/4233
let created_tenant_id = created_tenant.tenant_id();
debug_assert_eq!(created_tenant_id, tenant_shard_id.tenant_id);
if tenant_id != created_tenant_id {
return Err(TenantMapInsertError::Other(anyhow::anyhow!(
"loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {created_tenant_id})",
)));
}
slot_guard.upsert(TenantSlot::Attached(created_tenant.clone()))?;
@@ -790,70 +755,21 @@ pub(crate) async fn set_new_tenant_config(
}
impl TenantManager {
/// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or is not fitting to the query.
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
///
/// This method is cancel-safe.
pub(crate) fn get_attached_tenant_shard(
&self,
tenant_shard_id: TenantShardId,
active_only: bool,
) -> Result<Arc<Tenant>, GetTenantError> {
let locked = self.tenants.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
TenantState::Broken {
reason,
backtrace: _,
} if active_only => Err(GetTenantError::Broken(reason)),
TenantState::Active => Ok(Arc::clone(tenant)),
_ => {
if active_only {
Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
} else {
Ok(Arc::clone(tenant))
}
}
},
Some(TenantSlot::InProgress(_)) => {
Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
}
None | Some(TenantSlot::Secondary) => {
Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
}
}
}
pub(crate) async fn delete_timeline(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
_ctx: &RequestContext,
) -> Result<(), DeleteTimelineError> {
let tenant = self.get_attached_tenant_shard(tenant_shard_id, true)?;
DeleteTimelineFlow::run(&tenant, timeline_id, false).await?;
Ok(())
}
#[instrument(skip_all, fields(%tenant_id))]
pub(crate) async fn upsert_location(
&self,
tenant_shard_id: TenantShardId,
tenant_id: TenantId,
new_location_config: LocationConf,
ctx: &RequestContext,
) -> Result<(), anyhow::Error> {
debug_assert_current_span_has_tenant_id();
info!("configuring tenant location to state {new_location_config:?}");
info!("configuring tenant location {tenant_id} to state {new_location_config:?}");
// Special case fast-path for updates to Tenant: if our upsert is only updating configuration,
// then we do not need to set the slot to InProgress, we can just call into the
// existng tenant.
{
let locked = self.tenants.read().unwrap();
let peek_slot =
tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Write)?;
match (&new_location_config.mode, peek_slot) {
(LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
if attach_conf.generation == tenant.generation {
@@ -884,7 +800,7 @@ impl TenantManager {
// the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
// the state is ill-defined while we're in transition. Transitions are async, but fast: we do
// not do significant I/O, and shutdowns should be prompt via cancellation tokens.
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
let mut slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::Any)?;
if let Some(TenantSlot::Attached(tenant)) = slot_guard.get_old_value() {
// The case where we keep a Tenant alive was covered above in the special case
@@ -915,31 +831,25 @@ impl TenantManager {
slot_guard.drop_old_value().expect("We just shut it down");
}
// TODO(sharding): make local paths sharding-aware
let tenant_path = self.conf.tenant_path(&tenant_shard_id.tenant_id);
let tenant_path = self.conf.tenant_path(&tenant_id);
let new_slot = match &new_location_config.mode {
LocationMode::Secondary(_) => {
let tenant_path = self.conf.tenant_path(&tenant_id);
// Directory doesn't need to be fsync'd because if we crash it can
// safely be recreated next time this tenant location is configured.
unsafe_create_dir_all(&tenant_path)
.await
.with_context(|| format!("Creating {tenant_path}"))?;
// TODO(sharding): make local paths sharding-aware
Tenant::persist_tenant_config(
self.conf,
&tenant_shard_id.tenant_id,
&new_location_config,
)
.await
.map_err(SetNewTenantConfigError::Persist)?;
Tenant::persist_tenant_config(self.conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
TenantSlot::Secondary
}
LocationMode::Attached(_attach_config) => {
// TODO(sharding): make local paths sharding-aware
let timelines_path = self.conf.timelines_path(&tenant_shard_id.tenant_id);
let timelines_path = self.conf.timelines_path(&tenant_id);
// Directory doesn't need to be fsync'd because we do not depend on
// it to exist after crashes: it may be recreated when tenant is
@@ -948,19 +858,13 @@ impl TenantManager {
.await
.with_context(|| format!("Creating {timelines_path}"))?;
// TODO(sharding): make local paths sharding-aware
Tenant::persist_tenant_config(
self.conf,
&tenant_shard_id.tenant_id,
&new_location_config,
)
.await
.map_err(SetNewTenantConfigError::Persist)?;
Tenant::persist_tenant_config(self.conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
// TODO(sharding): make spawn sharding-aware
let tenant = tenant_spawn(
self.conf,
tenant_shard_id.tenant_id,
tenant_id,
&tenant_path,
self.resources.clone(),
AttachedTenantConf::try_from(new_location_config)?,
@@ -1006,11 +910,7 @@ pub(crate) fn get_tenant(
active_only: bool,
) -> Result<Arc<Tenant>, GetTenantError> {
let locked = TENANTS.read().unwrap();
// TODO(sharding): make all callers of get_tenant shard-aware
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
@@ -1070,16 +970,12 @@ pub(crate) async fn get_active_tenant_with_timeout(
Tenant(Arc<Tenant>),
}
// TODO(sharding): make page service interface sharding-aware (page service should apply ShardIdentity to the key
// to decide which shard services the request)
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let wait_start = Instant::now();
let deadline = wait_start + timeout;
let wait_for = {
let locked = TENANTS.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read)
.map_err(GetTenantError::MapState)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => {
@@ -1123,9 +1019,8 @@ pub(crate) async fn get_active_tenant_with_timeout(
})?;
{
let locked = TENANTS.read().unwrap();
let peek_slot =
tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
.map_err(GetTenantError::MapState)?;
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read)
.map_err(GetTenantError::MapState)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => tenant.clone(),
_ => {
@@ -1167,7 +1062,7 @@ pub(crate) async fn get_active_tenant_with_timeout(
pub(crate) async fn delete_tenant(
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenant_shard_id: TenantShardId,
tenant_id: TenantId,
) -> Result<(), DeleteTenantError> {
// We acquire a SlotGuard during this function to protect against concurrent
// changes while the ::prepare phase of DeleteTenantFlow executes, but then
@@ -1180,9 +1075,7 @@ pub(crate) async fn delete_tenant(
//
// See https://github.com/neondatabase/neon/issues/5080
// TODO(sharding): make delete API sharding-aware
let mut slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
let mut slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustExist)?;
// unwrap is safe because we used MustExist mode when acquiring
let tenant = match slot_guard.get_old_value().as_ref().unwrap() {
@@ -1209,6 +1102,16 @@ pub(crate) enum DeleteTimelineError {
Timeline(#[from] crate::tenant::DeleteTimelineError),
}
pub(crate) async fn delete_timeline(
tenant_id: TenantId,
timeline_id: TimelineId,
_ctx: &RequestContext,
) -> Result<(), DeleteTimelineError> {
let tenant = get_tenant(tenant_id, true)?;
DeleteTimelineFlow::run(&tenant, timeline_id, false).await?;
Ok(())
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum TenantStateError {
#[error("Tenant {0} is stopping")]
@@ -1223,14 +1126,14 @@ pub(crate) enum TenantStateError {
pub(crate) async fn detach_tenant(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
tenant_id: TenantId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<(), TenantStateError> {
let tmp_path = detach_tenant0(
conf,
&TENANTS,
tenant_shard_id,
tenant_id,
detach_ignored,
deletion_queue_client,
)
@@ -1257,24 +1160,19 @@ pub(crate) async fn detach_tenant(
async fn detach_tenant0(
conf: &'static PageServerConf,
tenants: &std::sync::RwLock<TenantsMap>,
tenant_shard_id: TenantShardId,
tenant_id: TenantId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<Utf8PathBuf, TenantStateError> {
let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
// TODO(sharding): make local path helpers shard-aware
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean.tenant_id);
let tenant_dir_rename_operation = |tenant_id_to_clean| async move {
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
safe_rename_tenant_dir(&local_tenant_directory)
.await
.with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))
};
let removal_result = remove_tenant_from_memory(
tenants,
tenant_shard_id,
tenant_dir_rename_operation(tenant_shard_id),
)
.await;
let removal_result =
remove_tenant_from_memory(tenants, tenant_id, tenant_dir_rename_operation(tenant_id)).await;
// Flush pending deletions, so that they have a good chance of passing validation
// before this tenant is potentially re-attached elsewhere.
@@ -1288,15 +1186,12 @@ async fn detach_tenant0(
Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
)
{
// TODO(sharding): make local paths sharding-aware
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id.tenant_id);
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
if tenant_ignore_mark.exists() {
info!("Detaching an ignored tenant");
let tmp_path = tenant_dir_rename_operation(tenant_shard_id)
let tmp_path = tenant_dir_rename_operation(tenant_id)
.await
.with_context(|| {
format!("Ignored tenant {tenant_shard_id} local directory rename")
})?;
.with_context(|| format!("Ignored tenant {tenant_id} local directory rename"))?;
return Ok(tmp_path);
}
}
@@ -1313,11 +1208,7 @@ pub(crate) async fn load_tenant(
deletion_queue_client: DeletionQueueClient,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
// This is a legacy API (replaced by `/location_conf`). It does not support sharding
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?;
let tenant_path = conf.tenant_path(&tenant_id);
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
@@ -1370,10 +1261,7 @@ async fn ignore_tenant0(
tenants: &std::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
) -> Result<(), TenantStateError> {
// This is a legacy API (replaced by `/location_conf`). It does not support sharding
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
remove_tenant_from_memory(tenants, tenant_shard_id, async {
remove_tenant_from_memory(tenants, tenant_id, async {
let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_id);
fs::File::create(&ignore_mark_file)
.await
@@ -1382,7 +1270,7 @@ async fn ignore_tenant0(
crashsafe::fsync_file_and_parent(&ignore_mark_file)
.context("Failed to fsync ignore mark file")
})
.with_context(|| format!("Failed to crate ignore mark for tenant {tenant_shard_id}"))?;
.with_context(|| format!("Failed to crate ignore mark for tenant {tenant_id}"))?;
Ok(())
})
.await
@@ -1405,12 +1293,10 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, Tenan
};
Ok(m.iter()
.filter_map(|(id, tenant)| match tenant {
TenantSlot::Attached(tenant) => Some((id, tenant.current_state())),
TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())),
TenantSlot::Secondary => None,
TenantSlot::InProgress(_) => None,
})
// TODO(sharding): make callers of this function shard-aware
.map(|(k, v)| (k.tenant_id, v))
.collect())
}
@@ -1426,11 +1312,7 @@ pub(crate) async fn attach_tenant(
resources: TenantSharedResources,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
// This is a legacy API (replaced by `/location_conf`). It does not support sharding
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?;
let location_conf = LocationConf::attached_single(tenant_conf, generation);
let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id).await?;
// TODO: tenant directory remains on disk if we bail out from here on.
@@ -1477,14 +1359,14 @@ pub(crate) enum TenantMapInsertError {
pub enum TenantSlotError {
/// When acquiring a slot with the expectation that the tenant already exists.
#[error("Tenant {0} not found")]
NotFound(TenantShardId),
NotFound(TenantId),
/// When acquiring a slot with the expectation that the tenant does not already exist.
#[error("tenant {0} already exists, state: {1:?}")]
AlreadyExists(TenantShardId, TenantState),
AlreadyExists(TenantId, TenantState),
#[error("tenant {0} already exists in but is not attached")]
Conflict(TenantShardId),
Conflict(TenantId),
// Tried to read a slot that is currently being mutated by another administrative
// operation.
@@ -1546,7 +1428,7 @@ pub enum TenantMapError {
/// `drop_old_value`. It is an error to call this without shutting down
/// the conents of `old_value`.
pub struct SlotGuard {
tenant_shard_id: TenantShardId,
tenant_id: TenantId,
old_value: Option<TenantSlot>,
upserted: bool,
@@ -1557,12 +1439,12 @@ pub struct SlotGuard {
impl SlotGuard {
fn new(
tenant_shard_id: TenantShardId,
tenant_id: TenantId,
old_value: Option<TenantSlot>,
completion: utils::completion::Completion,
) -> Self {
Self {
tenant_shard_id,
tenant_id,
old_value,
upserted: false,
_completion: completion,
@@ -1605,7 +1487,7 @@ impl SlotGuard {
TenantsMap::Open(m) => m,
};
let replaced = m.insert(self.tenant_shard_id, new_value);
let replaced = m.insert(self.tenant_id, new_value);
self.upserted = true;
METRICS.tenant_slots.set(m.len() as u64);
@@ -1624,7 +1506,7 @@ impl SlotGuard {
None => {
METRICS.unexpected_errors.inc();
error!(
tenant_shard_id = %self.tenant_shard_id,
tenant_id = %self.tenant_id,
"Missing InProgress marker during tenant upsert, this is a bug."
);
Err(TenantSlotUpsertError::InternalError(
@@ -1633,7 +1515,7 @@ impl SlotGuard {
}
Some(slot) => {
METRICS.unexpected_errors.inc();
error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot);
error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot);
Err(TenantSlotUpsertError::InternalError(
"Unexpected contents of TenantSlot".into(),
))
@@ -1711,12 +1593,12 @@ impl Drop for SlotGuard {
TenantsMap::Open(m) => m,
};
use std::collections::btree_map::Entry;
match m.entry(self.tenant_shard_id) {
use std::collections::hash_map::Entry;
match m.entry(self.tenant_id) {
Entry::Occupied(mut entry) => {
if !matches!(entry.get(), TenantSlot::InProgress(_)) {
METRICS.unexpected_errors.inc();
error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get());
error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get());
}
if self.old_value_is_shutdown() {
@@ -1728,7 +1610,7 @@ impl Drop for SlotGuard {
Entry::Vacant(_) => {
METRICS.unexpected_errors.inc();
error!(
tenant_shard_id = %self.tenant_shard_id,
tenant_id = %self.tenant_id,
"Missing InProgress marker during SlotGuard drop, this is a bug."
);
}
@@ -1747,7 +1629,7 @@ enum TenantSlotPeekMode {
fn tenant_map_peek_slot<'a>(
tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>,
tenant_shard_id: &TenantShardId,
tenant_id: &TenantId,
mode: TenantSlotPeekMode,
) -> Result<Option<&'a TenantSlot>, TenantMapError> {
let m = match tenants.deref() {
@@ -1761,7 +1643,7 @@ fn tenant_map_peek_slot<'a>(
TenantsMap::Open(m) => m,
};
Ok(m.get(tenant_shard_id))
Ok(m.get(tenant_id))
}
enum TenantSlotAcquireMode {
@@ -1774,14 +1656,14 @@ enum TenantSlotAcquireMode {
}
fn tenant_map_acquire_slot(
tenant_shard_id: &TenantShardId,
tenant_id: &TenantId,
mode: TenantSlotAcquireMode,
) -> Result<SlotGuard, TenantSlotError> {
tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode)
tenant_map_acquire_slot_impl(tenant_id, &TENANTS, mode)
}
fn tenant_map_acquire_slot_impl(
tenant_shard_id: &TenantShardId,
tenant_id: &TenantId,
tenants: &std::sync::RwLock<TenantsMap>,
mode: TenantSlotAcquireMode,
) -> Result<SlotGuard, TenantSlotError> {
@@ -1789,7 +1671,7 @@ fn tenant_map_acquire_slot_impl(
METRICS.tenant_slot_writes.inc();
let mut locked = tenants.write().unwrap();
let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard=tenant_shard_id.shard_slug());
let span = tracing::info_span!("acquire_slot", %tenant_id);
let _guard = span.enter();
let m = match &mut *locked {
@@ -1798,21 +1680,19 @@ fn tenant_map_acquire_slot_impl(
TenantsMap::Open(m) => m,
};
use std::collections::btree_map::Entry;
let entry = m.entry(*tenant_shard_id);
use std::collections::hash_map::Entry;
let entry = m.entry(*tenant_id);
match entry {
Entry::Vacant(v) => match mode {
MustExist => {
tracing::debug!("Vacant && MustExist: return NotFound");
Err(TenantSlotError::NotFound(*tenant_shard_id))
Err(TenantSlotError::NotFound(*tenant_id))
}
_ => {
let (completion, barrier) = utils::completion::channel();
v.insert(TenantSlot::InProgress(barrier));
tracing::debug!("Vacant, inserted InProgress");
Ok(SlotGuard::new(*tenant_shard_id, None, completion))
Ok(SlotGuard::new(*tenant_id, None, completion))
}
},
Entry::Occupied(mut o) => {
@@ -1826,7 +1706,7 @@ fn tenant_map_acquire_slot_impl(
TenantSlot::Attached(tenant) => {
tracing::debug!("Attached && MustNotExist, return AlreadyExists");
Err(TenantSlotError::AlreadyExists(
*tenant_shard_id,
*tenant_id,
tenant.current_state(),
))
}
@@ -1835,7 +1715,7 @@ fn tenant_map_acquire_slot_impl(
// to get the state from
tracing::debug!("Occupied & MustNotExist, return AlreadyExists");
Err(TenantSlotError::AlreadyExists(
*tenant_shard_id,
*tenant_id,
TenantState::Broken {
reason: "Present but not attached".to_string(),
backtrace: "".to_string(),
@@ -1848,11 +1728,7 @@ fn tenant_map_acquire_slot_impl(
let (completion, barrier) = utils::completion::channel();
let old_value = o.insert(TenantSlot::InProgress(barrier));
tracing::debug!("Occupied, replaced with InProgress");
Ok(SlotGuard::new(
*tenant_shard_id,
Some(old_value),
completion,
))
Ok(SlotGuard::new(*tenant_id, Some(old_value), completion))
}
}
}
@@ -1865,7 +1741,7 @@ fn tenant_map_acquire_slot_impl(
/// operation would be needed to remove it.
async fn remove_tenant_from_memory<V, F>(
tenants: &std::sync::RwLock<TenantsMap>,
tenant_shard_id: TenantShardId,
tenant_id: TenantId,
tenant_cleanup: F,
) -> Result<V, TenantStateError>
where
@@ -1874,7 +1750,7 @@ where
use utils::completion;
let mut slot_guard =
tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
tenant_map_acquire_slot_impl(&tenant_id, tenants, TenantSlotAcquireMode::MustExist)?;
// The SlotGuard allows us to manipulate the Tenant object without fear of some
// concurrent API request doing something else for the same tenant ID.
@@ -1901,7 +1777,7 @@ where
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
// wait for it but return an error right away because these are distinct requests.
slot_guard.revert();
return Err(TenantStateError::IsStopping(tenant_shard_id.tenant_id));
return Err(TenantStateError::IsStopping(tenant_id));
}
}
}
@@ -1912,7 +1788,7 @@ where
match tenant_cleanup
.await
.with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
.with_context(|| format!("Failed to run cleanup for tenant {tenant_id}"))
{
Ok(hook_value) => {
// Success: drop the old TenantSlot::Attached.
@@ -1991,8 +1867,7 @@ pub(crate) async fn immediate_gc(
#[cfg(test)]
mod tests {
use pageserver_api::shard::TenantShardId;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{info_span, Instrument};
@@ -2012,12 +1887,12 @@ mod tests {
// harness loads it to active, which is forced and nothing is running on the tenant
let id = TenantShardId::unsharded(t.tenant_id());
let id = t.tenant_id();
// tenant harness configures the logging and we cannot escape it
let _e = info_span!("testing", tenant_id = %id).entered();
let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
let tenants = HashMap::from([(id, TenantSlot::Attached(t.clone()))]);
let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
// Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually

View File

@@ -1,9 +1,7 @@
//! User credentials used in authentication.
use crate::{
auth::password_hack::parse_endpoint_param,
error::UserFacingError,
proxy::{neon_options, NUM_CONNECTION_ACCEPTED_BY_SNI},
auth::password_hack::parse_endpoint_param, error::UserFacingError, proxy::neon_options,
};
use itertools::Itertools;
use pq_proto::StartupMessageParams;
@@ -126,22 +124,6 @@ impl<'a> ClientCredentials<'a> {
.transpose()?;
info!(user, project = project.as_deref(), "credentials");
if sni.is_some() {
info!("Connection with sni");
NUM_CONNECTION_ACCEPTED_BY_SNI
.with_label_values(&["sni"])
.inc();
} else if project.is_some() {
NUM_CONNECTION_ACCEPTED_BY_SNI
.with_label_values(&["no_sni"])
.inc();
info!("Connection without sni");
} else {
NUM_CONNECTION_ACCEPTED_BY_SNI
.with_label_values(&["password_hack"])
.inc();
info!("Connection with password hack");
}
let cache_key = format!(
"{}{}",

View File

@@ -129,15 +129,6 @@ pub static RATE_LIMITER_LIMIT: Lazy<IntGaugeVec> = Lazy::new(|| {
.unwrap()
});
pub static NUM_CONNECTION_ACCEPTED_BY_SNI: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_accepted_connections_by_sni",
"Number of connections (per sni).",
&["kind"],
)
.unwrap()
});
pub struct LatencyTimer {
// time since the stopwatch was started
start: Option<Instant>,

View File

@@ -10,18 +10,13 @@ use anyhow::bail;
use hyper::StatusCode;
pub use reqwest_middleware::{ClientWithMiddleware, Error};
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use tokio::task::JoinSet;
use crate::protocol2::{ProxyProtocolAccept, WithClientIp};
use crate::protocol2::ProxyProtocolAccept;
use crate::proxy::{NUM_CLIENT_CONNECTION_CLOSED_COUNTER, NUM_CLIENT_CONNECTION_OPENED_COUNTER};
use crate::{cancellation::CancelMap, config::ProxyConfig};
use futures::StreamExt;
use hyper::{
server::{
accept,
conn::{AddrIncoming, AddrStream},
},
Body, Method, Request, Response,
};
use hyper::{server::conn::AddrIncoming, Body, Method, Request, Response};
use std::task::Poll;
use std::{future::ready, sync::Arc};
@@ -69,7 +64,7 @@ pub async fn task_main(
incoming: addr_incoming,
};
let tls_listener = TlsListener::new(tls_acceptor, addr_incoming).filter(|conn| {
let mut tls_listener = TlsListener::new(tls_acceptor, addr_incoming).filter(|conn| {
if let Err(err) = conn {
error!("failed to accept TLS connection for websockets: {err:?}");
ready(false)
@@ -78,49 +73,71 @@ pub async fn task_main(
}
});
let make_svc = hyper::service::make_service_fn(
|stream: &tokio_rustls::server::TlsStream<WithClientIp<AddrStream>>| {
let (io, tls) = stream.get_ref();
let client_addr = io.client_addr();
let remote_addr = io.inner.remote_addr();
let sni_name = tls.server_name().map(|s| s.to_string());
let conn_pool = conn_pool.clone();
let mut connections = JoinSet::new();
loop {
tokio::select! {
Some(tls_stream) = tls_listener.next() => {
let tls_stream = tls_stream?;
let (io, tls) = tls_stream.get_ref();
let client_addr = io.client_addr();
let remote_addr = io.inner.remote_addr();
let sni_name = tls.server_name().map(|s| s.to_string());
let conn_pool = conn_pool.clone();
async move {
let peer_addr = match client_addr {
Some(addr) => addr,
None if config.require_client_ip => bail!("missing required client ip"),
None => remote_addr,
};
Ok(MetricService::new(hyper::service::service_fn(
move |req: Request<Body>| {
let sni_name = sni_name.clone();
let conn_pool = conn_pool.clone();
async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
let service = MetricService::new(hyper::service::service_fn(move |req: Request<Body>| {
let sni_name = sni_name.clone();
let conn_pool = conn_pool.clone();
request_handler(
req, config, conn_pool, cancel_map, session_id, sni_name,
)
async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
request_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
.instrument(info_span!(
"serverless",
session = %session_id,
%peer_addr,
))
.await
}
},
)))
}
},
);
}
}));
hyper::Server::builder(accept::from_stream(tls_listener))
.serve(make_svc)
.with_graceful_shutdown(cancellation_token.cancelled())
.await?;
connections.spawn(async move {
// todo(conrad): http2?
if let Err(err) = hyper::server::conn::http1::Builder::new()
.serve_connection(tls_stream, service)
.await
{
println!("Error serving connection: {:?}", err);
}
});
}
Some(Err(e)) = connections.join_next(), if !connections.is_empty() => {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
}
}
_ = cancellation_token.cancelled() => {
drop(tls_listener);
break;
}
}
}
// Drain connections
while let Some(res) = connections.join_next().await {
if let Err(e) = res {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
}
}
}
Ok(())
}

View File

@@ -210,7 +210,12 @@ impl GlobalConnPool {
client.session.send(session_id)?;
latency_timer.pool_hit();
latency_timer.success();
return Ok(Client::new(client, pool).await);
return Ok(Client {
conn_id: client.conn_id,
inner: Some(client),
span: Span::current(),
pool,
});
}
} else {
let conn_id = uuid::Uuid::new_v4();
@@ -258,11 +263,15 @@ impl GlobalConnPool {
_ => {}
}
// new_client.map(|inner| Client::new(inner, pool).await)
Ok(Client::new(new_client?, pool).await)
new_client.map(|inner| Client {
conn_id: inner.conn_id,
inner: Some(inner),
span: Span::current(),
pool,
})
}
fn put(&self, conn_info: &ConnInfo, client: ClientInner, pid: i32) -> anyhow::Result<()> {
fn put(&self, conn_info: &ConnInfo, client: ClientInner) -> anyhow::Result<()> {
let conn_id = client.conn_id;
// We want to hold this open while we return. This ensures that the pool can't close
@@ -306,9 +315,9 @@ impl GlobalConnPool {
// do logging outside of the mutex
if returned {
info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}, pid={pid}");
info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
} else {
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}, pid={pid}");
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
}
Ok(())
@@ -519,22 +528,6 @@ struct ClientInner {
conn_id: uuid::Uuid,
}
impl ClientInner {
pub async fn get_pid(&mut self) -> anyhow::Result<i32> {
let rows = self.inner.query("select pg_backend_pid();", &[]).await?;
if rows.len() != 1 {
Err(anyhow::anyhow!(
"expected 1 row from pg_backend_pid(), got {}",
rows.len()
))
} else {
let pid = rows[0].get(0);
info!(%pid, "got pid");
Ok(pid)
}
}
}
impl Client {
pub fn metrics(&self) -> Arc<MetricCounter> {
USAGE_METRICS.register(self.inner.as_ref().unwrap().ids.clone())
@@ -546,7 +539,6 @@ pub struct Client {
span: Span,
inner: Option<ClientInner>,
pool: Option<(ConnInfo, Arc<GlobalConnPool>)>,
pid: i32,
}
pub struct Discard<'a> {
@@ -555,25 +547,12 @@ pub struct Discard<'a> {
}
impl Client {
pub(self) async fn new(
mut inner: ClientInner,
pool: Option<(ConnInfo, Arc<GlobalConnPool>)>,
) -> Self {
Self {
conn_id: inner.conn_id,
pid: inner.get_pid().await.unwrap_or(-1),
inner: Some(inner),
span: Span::current(),
pool,
}
}
pub fn inner(&mut self) -> (&mut tokio_postgres::Client, Discard<'_>) {
let Self {
inner,
pool,
conn_id,
span: _,
pid: _,
} = self;
(
&mut inner
@@ -630,11 +609,10 @@ impl Drop for Client {
.expect("client inner should not be removed");
if let Some((conn_info, conn_pool)) = self.pool.take() {
let current_span = self.span.clone();
let pid = self.pid;
// return connection to the pool
tokio::task::spawn_blocking(move || {
let _span = current_span.enter();
let _ = conn_pool.put(&conn_info, client, pid);
let _ = conn_pool.put(&conn_info, client);
});
}
}

View File

@@ -1,143 +0,0 @@
# Supplemental file for neondatabase/autoscaling's vm-builder, for producing the VM compute image.
---
commands:
- name: cgconfigparser
user: root
sysvInitAction: sysinit
shell: "cgconfigparser -l /etc/cgconfig.conf -s 1664"
- name: pgbouncer
user: nobody
sysvInitAction: respawn
shell: "/usr/local/bin/pgbouncer /etc/pgbouncer.ini"
- name: postgres-exporter
user: nobody
sysvInitAction: respawn
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres" /bin/postgres_exporter --extend.query-path /etc/postgres_exporter_queries.yml'
shutdownHook: |
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
files:
- filename: pgbouncer.ini
content: |
[databases]
*=host=localhost port=5432 auth_user=cloud_admin
[pgbouncer]
listen_port=6432
listen_addr=0.0.0.0
auth_type=scram-sha-256
auth_user=cloud_admin
auth_dbname=postgres
client_tls_sslmode=disable
server_tls_sslmode=disable
pool_mode=transaction
max_client_conn=10000
default_pool_size=16
max_prepared_statements=0
- filename: cgconfig.conf
content: |
# Configuration for cgroups in VM compute nodes
group neon-postgres {
perm {
admin {
uid = postgres;
}
task {
gid = users;
}
}
memory {}
}
- filename: postgres_exporter_queries.yml
content: |
postgres_exporter_pg_database_size:
query: "SELECT pg_database.datname, pg_database_size(pg_database.datname) as bytes, 42 as fourtytwo FROM pg_database"
cache_seconds: 30
metrics:
- datname:
usage: "LABEL"
description: "Name of the database"
- bytes:
usage: "GAUGE"
description: "Disk space used by the database"
- fourtytwo:
usage: "GAUGE"
description: "fourtytwo"
build: |
# Build cgroup-tools
#
# At time of writing (2023-03-14), debian bullseye has a version of cgroup-tools (technically
# libcgroup) that doesn't support cgroup v2 (version 0.41-11). Unfortunately, the vm-monitor
# requires cgroup v2, so we'll build cgroup-tools ourselves.
FROM debian:bullseye-slim as libcgroup-builder
ENV LIBCGROUP_VERSION v2.0.3
RUN set -exu \
&& apt update \
&& apt install --no-install-recommends -y \
git \
ca-certificates \
automake \
cmake \
make \
gcc \
byacc \
flex \
libtool \
libpam0g-dev \
&& git clone --depth 1 -b $LIBCGROUP_VERSION https://github.com/libcgroup/libcgroup \
&& INSTALL_DIR="/libcgroup-install" \
&& mkdir -p "$INSTALL_DIR/bin" "$INSTALL_DIR/include" \
&& cd libcgroup \
# extracted from bootstrap.sh, with modified flags:
&& (test -d m4 || mkdir m4) \
&& autoreconf -fi \
&& rm -rf autom4te.cache \
&& CFLAGS="-O3" ./configure --prefix="$INSTALL_DIR" --sysconfdir=/etc --localstatedir=/var --enable-opaque-hierarchy="name=systemd" \
# actually build the thing...
&& make install
FROM quay.io/prometheuscommunity/postgres-exporter:v0.12.0 AS postgres-exporter
# Build pgbouncer
#
FROM debian:bullseye-slim AS pgbouncer
RUN set -e \
&& apt-get update \
&& apt-get install -y \
curl \
build-essential \
pkg-config \
libevent-dev \
libssl-dev
ENV PGBOUNCER_VERSION 1.21.0
ENV PGBOUNCER_GITPATH 1_21_0
RUN set -e \
&& curl -sfSL https://github.com/pgbouncer/pgbouncer/releases/download/pgbouncer_${PGBOUNCER_GITPATH}/pgbouncer-${PGBOUNCER_VERSION}.tar.gz -o pgbouncer-${PGBOUNCER_VERSION}.tar.gz \
&& tar xzvf pgbouncer-${PGBOUNCER_VERSION}.tar.gz \
&& cd pgbouncer-${PGBOUNCER_VERSION} \
&& LDFLAGS=-static ./configure --prefix=/usr/local/pgbouncer --without-openssl \
&& make -j $(nproc) \
&& make install
merge: |
# tweak nofile limits
RUN set -e \
&& echo 'fs.file-max = 1048576' >>/etc/sysctl.conf \
&& test ! -e /etc/security || ( \
echo '* - nofile 1048576' >>/etc/security/limits.conf \
&& echo 'root - nofile 1048576' >>/etc/security/limits.conf \
)
COPY cgconfig.conf /etc/cgconfig.conf
COPY pgbouncer.ini /etc/pgbouncer.ini
COPY postgres_exporter_queries.yml /etc/postgres_exporter_queries.yml
RUN set -e \
&& chown postgres:postgres /etc/pgbouncer.ini \
&& chmod 0644 /etc/pgbouncer.ini \
&& chmod 0644 /etc/cgconfig.conf \
&& chmod 0644 /etc/postgres_exporter_queries.yml
COPY --from=libcgroup-builder /libcgroup-install/bin/* /usr/bin/
COPY --from=libcgroup-builder /libcgroup-install/lib/* /usr/lib/
COPY --from=libcgroup-builder /libcgroup-install/sbin/* /usr/sbin/
COPY --from=postgres-exporter /bin/postgres_exporter /bin/postgres_exporter
COPY --from=pgbouncer /usr/local/pgbouncer/bin/pgbouncer /usr/local/bin/pgbouncer

View File

@@ -36,7 +36,7 @@ futures-io = { version = "0.3" }
futures-sink = { version = "0.3" }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
hex = { version = "0.4", features = ["serde"] }
hyper = { version = "0.14", features = ["full"] }
hyper = { version = "0.14", features = ["backports", "full"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["std"] }