mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-11 14:40:36 +00:00
Compare commits
9 Commits
hyper1
...
hack/compu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1bf5e07da1 | ||
|
|
87de91b004 | ||
|
|
2f217f9ebd | ||
|
|
71491dd467 | ||
|
|
67e791c4ec | ||
|
|
517782ab94 | ||
|
|
d0a842a509 | ||
|
|
6b82f22ada | ||
|
|
ab631e6792 |
5
.github/workflows/build_and_test.yml
vendored
5
.github/workflows/build_and_test.yml
vendored
@@ -852,7 +852,7 @@ jobs:
|
|||||||
run:
|
run:
|
||||||
shell: sh -eu {0}
|
shell: sh -eu {0}
|
||||||
env:
|
env:
|
||||||
VM_BUILDER_VERSION: v0.18.5
|
VM_BUILDER_VERSION: v0.19.0
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout
|
- name: Checkout
|
||||||
@@ -874,8 +874,7 @@ jobs:
|
|||||||
- name: Build vm image
|
- name: Build vm image
|
||||||
run: |
|
run: |
|
||||||
./vm-builder \
|
./vm-builder \
|
||||||
-enable-file-cache \
|
-spec=vm-image-spec.yaml \
|
||||||
-cgroup-uid=postgres \
|
|
||||||
-src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} \
|
-src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} \
|
||||||
-dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
|
-dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
|
||||||
|
|
||||||
|
|||||||
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -2994,10 +2994,12 @@ name = "pageserver_api"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
"bincode",
|
||||||
"byteorder",
|
"byteorder",
|
||||||
"bytes",
|
"bytes",
|
||||||
"const_format",
|
"const_format",
|
||||||
"enum-map",
|
"enum-map",
|
||||||
|
"hex",
|
||||||
"postgres_ffi",
|
"postgres_ffi",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
|||||||
@@ -714,6 +714,23 @@ RUN wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.3.tar.gz -
|
|||||||
cargo pgrx install --release && \
|
cargo pgrx install --release && \
|
||||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/ulid.control
|
echo "trusted = true" >> /usr/local/pgsql/share/extension/ulid.control
|
||||||
|
|
||||||
|
#########################################################################################
|
||||||
|
#
|
||||||
|
# Layer "pg-wait-sampling-pg-build"
|
||||||
|
# compile pg_wait_sampling extension
|
||||||
|
#
|
||||||
|
#########################################################################################
|
||||||
|
FROM build-deps AS pg-wait-sampling-pg-build
|
||||||
|
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||||
|
|
||||||
|
ENV PATH "/usr/local/pgsql/bin/:$PATH"
|
||||||
|
RUN wget https://github.com/postgrespro/pg_wait_sampling/archive/refs/tags/v1.1.5.tar.gz -O pg_wait_sampling.tar.gz && \
|
||||||
|
echo 'a03da6a413f5652ce470a3635ed6ebba528c74cb26aa4cfced8aff8a8441f81ec6dd657ff62cd6ce96a4e6ce02cad9f2519ae9525367ece60497aa20faafde5c pg_wait_sampling.tar.gz' | sha512sum -c && \
|
||||||
|
mkdir pg_wait_sampling-src && cd pg_wait_sampling-src && tar xvzf ../pg_wait_sampling.tar.gz --strip-components=1 -C . && \
|
||||||
|
make USE_PGXS=1 -j $(getconf _NPROCESSORS_ONLN) && \
|
||||||
|
make USE_PGXS=1 -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||||
|
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_wait_sampling.control
|
||||||
|
|
||||||
#########################################################################################
|
#########################################################################################
|
||||||
#
|
#
|
||||||
# Layer "neon-pg-ext-build"
|
# Layer "neon-pg-ext-build"
|
||||||
@@ -750,6 +767,7 @@ COPY --from=rdkit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
|||||||
COPY --from=pg-uuidv7-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
COPY --from=pg-uuidv7-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||||
COPY --from=pg-roaringbitmap-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
COPY --from=pg-roaringbitmap-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||||
COPY --from=pg-embedding-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
COPY --from=pg-embedding-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||||
|
COPY --from=pg-wait-sampling-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||||
COPY pgxn/ pgxn/
|
COPY pgxn/ pgxn/
|
||||||
|
|
||||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ use camino::Utf8PathBuf;
|
|||||||
use pageserver_api::models::{
|
use pageserver_api::models::{
|
||||||
self, LocationConfig, TenantInfo, TenantLocationConfigRequest, TimelineInfo,
|
self, LocationConfig, TenantInfo, TenantLocationConfigRequest, TimelineInfo,
|
||||||
};
|
};
|
||||||
|
use pageserver_api::shard::TenantShardId;
|
||||||
use postgres_backend::AuthType;
|
use postgres_backend::AuthType;
|
||||||
use postgres_connection::{parse_host_port, PgConnectionConfig};
|
use postgres_connection::{parse_host_port, PgConnectionConfig};
|
||||||
use reqwest::blocking::{Client, RequestBuilder, Response};
|
use reqwest::blocking::{Client, RequestBuilder, Response};
|
||||||
@@ -408,7 +409,7 @@ impl PageServerNode {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let request = models::TenantCreateRequest {
|
let request = models::TenantCreateRequest {
|
||||||
new_tenant_id,
|
new_tenant_id: TenantShardId::unsharded(new_tenant_id),
|
||||||
generation,
|
generation,
|
||||||
config,
|
config,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -17,5 +17,9 @@ postgres_ffi.workspace = true
|
|||||||
enum-map.workspace = true
|
enum-map.workspace = true
|
||||||
strum.workspace = true
|
strum.workspace = true
|
||||||
strum_macros.workspace = true
|
strum_macros.workspace = true
|
||||||
|
hex.workspace = true
|
||||||
|
|
||||||
workspace_hack.workspace = true
|
workspace_hack.workspace = true
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
bincode.workspace = true
|
||||||
|
|||||||
142
libs/pageserver_api/src/key.rs
Normal file
142
libs/pageserver_api/src/key.rs
Normal file
@@ -0,0 +1,142 @@
|
|||||||
|
use anyhow::{bail, Result};
|
||||||
|
use byteorder::{ByteOrder, BE};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
|
/// Key used in the Repository kv-store.
|
||||||
|
///
|
||||||
|
/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs
|
||||||
|
/// for what we actually store in these fields.
|
||||||
|
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
|
||||||
|
pub struct Key {
|
||||||
|
pub field1: u8,
|
||||||
|
pub field2: u32,
|
||||||
|
pub field3: u32,
|
||||||
|
pub field4: u32,
|
||||||
|
pub field5: u8,
|
||||||
|
pub field6: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub const KEY_SIZE: usize = 18;
|
||||||
|
|
||||||
|
impl Key {
|
||||||
|
/// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish.
|
||||||
|
/// As long as Neon does not support tablespace (because of lack of access to local file system),
|
||||||
|
/// we can assume that only some predefined namespace OIDs are used which can fit in u16
|
||||||
|
pub fn to_i128(&self) -> i128 {
|
||||||
|
assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222);
|
||||||
|
(((self.field1 & 0xf) as i128) << 120)
|
||||||
|
| (((self.field2 & 0xFFFF) as i128) << 104)
|
||||||
|
| ((self.field3 as i128) << 72)
|
||||||
|
| ((self.field4 as i128) << 40)
|
||||||
|
| ((self.field5 as i128) << 32)
|
||||||
|
| self.field6 as i128
|
||||||
|
}
|
||||||
|
|
||||||
|
pub const fn from_i128(x: i128) -> Self {
|
||||||
|
Key {
|
||||||
|
field1: ((x >> 120) & 0xf) as u8,
|
||||||
|
field2: ((x >> 104) & 0xFFFF) as u32,
|
||||||
|
field3: (x >> 72) as u32,
|
||||||
|
field4: (x >> 40) as u32,
|
||||||
|
field5: (x >> 32) as u8,
|
||||||
|
field6: x as u32,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn next(&self) -> Key {
|
||||||
|
self.add(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add(&self, x: u32) -> Key {
|
||||||
|
let mut key = *self;
|
||||||
|
|
||||||
|
let r = key.field6.overflowing_add(x);
|
||||||
|
key.field6 = r.0;
|
||||||
|
if r.1 {
|
||||||
|
let r = key.field5.overflowing_add(1);
|
||||||
|
key.field5 = r.0;
|
||||||
|
if r.1 {
|
||||||
|
let r = key.field4.overflowing_add(1);
|
||||||
|
key.field4 = r.0;
|
||||||
|
if r.1 {
|
||||||
|
let r = key.field3.overflowing_add(1);
|
||||||
|
key.field3 = r.0;
|
||||||
|
if r.1 {
|
||||||
|
let r = key.field2.overflowing_add(1);
|
||||||
|
key.field2 = r.0;
|
||||||
|
if r.1 {
|
||||||
|
let r = key.field1.overflowing_add(1);
|
||||||
|
key.field1 = r.0;
|
||||||
|
assert!(!r.1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
key
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_slice(b: &[u8]) -> Self {
|
||||||
|
Key {
|
||||||
|
field1: b[0],
|
||||||
|
field2: u32::from_be_bytes(b[1..5].try_into().unwrap()),
|
||||||
|
field3: u32::from_be_bytes(b[5..9].try_into().unwrap()),
|
||||||
|
field4: u32::from_be_bytes(b[9..13].try_into().unwrap()),
|
||||||
|
field5: b[13],
|
||||||
|
field6: u32::from_be_bytes(b[14..18].try_into().unwrap()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write_to_byte_slice(&self, buf: &mut [u8]) {
|
||||||
|
buf[0] = self.field1;
|
||||||
|
BE::write_u32(&mut buf[1..5], self.field2);
|
||||||
|
BE::write_u32(&mut buf[5..9], self.field3);
|
||||||
|
BE::write_u32(&mut buf[9..13], self.field4);
|
||||||
|
buf[13] = self.field5;
|
||||||
|
BE::write_u32(&mut buf[14..18], self.field6);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for Key {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}",
|
||||||
|
self.field1, self.field2, self.field3, self.field4, self.field5, self.field6
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Key {
|
||||||
|
pub const MIN: Key = Key {
|
||||||
|
field1: u8::MIN,
|
||||||
|
field2: u32::MIN,
|
||||||
|
field3: u32::MIN,
|
||||||
|
field4: u32::MIN,
|
||||||
|
field5: u8::MIN,
|
||||||
|
field6: u32::MIN,
|
||||||
|
};
|
||||||
|
pub const MAX: Key = Key {
|
||||||
|
field1: u8::MAX,
|
||||||
|
field2: u32::MAX,
|
||||||
|
field3: u32::MAX,
|
||||||
|
field4: u32::MAX,
|
||||||
|
field5: u8::MAX,
|
||||||
|
field6: u32::MAX,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub fn from_hex(s: &str) -> Result<Self> {
|
||||||
|
if s.len() != 36 {
|
||||||
|
bail!("parse error");
|
||||||
|
}
|
||||||
|
Ok(Key {
|
||||||
|
field1: u8::from_str_radix(&s[0..2], 16)?,
|
||||||
|
field2: u32::from_str_radix(&s[2..10], 16)?,
|
||||||
|
field3: u32::from_str_radix(&s[10..18], 16)?,
|
||||||
|
field4: u32::from_str_radix(&s[18..26], 16)?,
|
||||||
|
field5: u8::from_str_radix(&s[26..28], 16)?,
|
||||||
|
field6: u32::from_str_radix(&s[28..36], 16)?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -4,8 +4,10 @@ use const_format::formatcp;
|
|||||||
|
|
||||||
/// Public API types
|
/// Public API types
|
||||||
pub mod control_api;
|
pub mod control_api;
|
||||||
|
pub mod key;
|
||||||
pub mod models;
|
pub mod models;
|
||||||
pub mod reltag;
|
pub mod reltag;
|
||||||
|
pub mod shard;
|
||||||
|
|
||||||
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
|
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
|
||||||
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ use utils::{
|
|||||||
lsn::Lsn,
|
lsn::Lsn,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::reltag::RelTag;
|
use crate::{reltag::RelTag, shard::TenantShardId};
|
||||||
use anyhow::bail;
|
use anyhow::bail;
|
||||||
use bytes::{BufMut, Bytes, BytesMut};
|
use bytes::{BufMut, Bytes, BytesMut};
|
||||||
|
|
||||||
@@ -187,7 +187,7 @@ pub struct TimelineCreateRequest {
|
|||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
#[serde(deny_unknown_fields)]
|
#[serde(deny_unknown_fields)]
|
||||||
pub struct TenantCreateRequest {
|
pub struct TenantCreateRequest {
|
||||||
pub new_tenant_id: TenantId,
|
pub new_tenant_id: TenantShardId,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub generation: Option<u32>,
|
pub generation: Option<u32>,
|
||||||
|
|||||||
321
libs/pageserver_api/src/shard.rs
Normal file
321
libs/pageserver_api/src/shard.rs
Normal file
@@ -0,0 +1,321 @@
|
|||||||
|
use std::{ops::RangeInclusive, str::FromStr};
|
||||||
|
|
||||||
|
use hex::FromHex;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use utils::id::TenantId;
|
||||||
|
|
||||||
|
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
|
||||||
|
pub struct ShardNumber(pub u8);
|
||||||
|
|
||||||
|
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
|
||||||
|
pub struct ShardCount(pub u8);
|
||||||
|
|
||||||
|
impl ShardCount {
|
||||||
|
pub const MAX: Self = Self(u8::MAX);
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ShardNumber {
|
||||||
|
pub const MAX: Self = Self(u8::MAX);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// TenantShardId identify the units of work for the Pageserver.
|
||||||
|
///
|
||||||
|
/// These are written as `<tenant_id>-<shard number><shard-count>`, for example:
|
||||||
|
///
|
||||||
|
/// # The second shard in a two-shard tenant
|
||||||
|
/// 072f1291a5310026820b2fe4b2968934-0102
|
||||||
|
///
|
||||||
|
/// Historically, tenants could not have multiple shards, and were identified
|
||||||
|
/// by TenantId. To support this, TenantShardId has a special legacy
|
||||||
|
/// mode where `shard_count` is equal to zero: this represents a single-sharded
|
||||||
|
/// tenant which should be written as a TenantId with no suffix.
|
||||||
|
///
|
||||||
|
/// The human-readable encoding of TenantShardId, such as used in API URLs,
|
||||||
|
/// is both forward and backward compatible: a legacy TenantId can be
|
||||||
|
/// decoded as a TenantShardId, and when re-encoded it will be parseable
|
||||||
|
/// as a TenantId.
|
||||||
|
///
|
||||||
|
/// Note that the binary encoding is _not_ backward compatible, because
|
||||||
|
/// at the time sharding is introduced, there are no existing binary structures
|
||||||
|
/// containing TenantId that we need to handle.
|
||||||
|
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy)]
|
||||||
|
pub struct TenantShardId {
|
||||||
|
pub tenant_id: TenantId,
|
||||||
|
pub shard_number: ShardNumber,
|
||||||
|
pub shard_count: ShardCount,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TenantShardId {
|
||||||
|
pub fn unsharded(tenant_id: TenantId) -> Self {
|
||||||
|
Self {
|
||||||
|
tenant_id,
|
||||||
|
shard_number: ShardNumber(0),
|
||||||
|
shard_count: ShardCount(0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The range of all TenantShardId that belong to a particular TenantId. This is useful when
|
||||||
|
/// you have a BTreeMap of TenantShardId, and are querying by TenantId.
|
||||||
|
pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
|
||||||
|
RangeInclusive::new(
|
||||||
|
Self {
|
||||||
|
tenant_id,
|
||||||
|
shard_number: ShardNumber(0),
|
||||||
|
shard_count: ShardCount(0),
|
||||||
|
},
|
||||||
|
Self {
|
||||||
|
tenant_id,
|
||||||
|
shard_number: ShardNumber::MAX,
|
||||||
|
shard_count: ShardCount::MAX,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn shard_slug(&self) -> String {
|
||||||
|
format!("{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for TenantShardId {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
if self.shard_count != ShardCount(0) {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"{}-{:02x}{:02x}",
|
||||||
|
self.tenant_id, self.shard_number.0, self.shard_count.0
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
// Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
|
||||||
|
// is distinct from the normal single shard case (shard count == 1).
|
||||||
|
self.tenant_id.fmt(f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Debug for TenantShardId {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
// Debug is the same as Display: the compact hex representation
|
||||||
|
write!(f, "{}", self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::str::FromStr for TenantShardId {
|
||||||
|
type Err = hex::FromHexError;
|
||||||
|
|
||||||
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||||
|
// Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
|
||||||
|
if s.len() == 32 {
|
||||||
|
// Legacy case: no shard specified
|
||||||
|
Ok(Self {
|
||||||
|
tenant_id: TenantId::from_str(s)?,
|
||||||
|
shard_number: ShardNumber(0),
|
||||||
|
shard_count: ShardCount(0),
|
||||||
|
})
|
||||||
|
} else if s.len() == 37 {
|
||||||
|
let bytes = s.as_bytes();
|
||||||
|
let tenant_id = TenantId::from_hex(&bytes[0..32])?;
|
||||||
|
let mut shard_parts: [u8; 2] = [0u8; 2];
|
||||||
|
hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
|
||||||
|
Ok(Self {
|
||||||
|
tenant_id,
|
||||||
|
shard_number: ShardNumber(shard_parts[0]),
|
||||||
|
shard_count: ShardCount(shard_parts[1]),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
Err(hex::FromHexError::InvalidStringLength)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<[u8; 18]> for TenantShardId {
|
||||||
|
fn from(b: [u8; 18]) -> Self {
|
||||||
|
let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
|
||||||
|
|
||||||
|
Self {
|
||||||
|
tenant_id: TenantId::from(tenant_id_bytes),
|
||||||
|
shard_number: ShardNumber(b[16]),
|
||||||
|
shard_count: ShardCount(b[17]),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Serialize for TenantShardId {
|
||||||
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: serde::Serializer,
|
||||||
|
{
|
||||||
|
if serializer.is_human_readable() {
|
||||||
|
serializer.collect_str(self)
|
||||||
|
} else {
|
||||||
|
let mut packed: [u8; 18] = [0; 18];
|
||||||
|
packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
|
||||||
|
packed[16] = self.shard_number.0;
|
||||||
|
packed[17] = self.shard_count.0;
|
||||||
|
|
||||||
|
packed.serialize(serializer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'de> Deserialize<'de> for TenantShardId {
|
||||||
|
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||||
|
where
|
||||||
|
D: serde::Deserializer<'de>,
|
||||||
|
{
|
||||||
|
struct IdVisitor {
|
||||||
|
is_human_readable_deserializer: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'de> serde::de::Visitor<'de> for IdVisitor {
|
||||||
|
type Value = TenantShardId;
|
||||||
|
|
||||||
|
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||||
|
if self.is_human_readable_deserializer {
|
||||||
|
formatter.write_str("value in form of hex string")
|
||||||
|
} else {
|
||||||
|
formatter.write_str("value in form of integer array([u8; 18])")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
|
||||||
|
where
|
||||||
|
A: serde::de::SeqAccess<'de>,
|
||||||
|
{
|
||||||
|
let s = serde::de::value::SeqAccessDeserializer::new(seq);
|
||||||
|
let id: [u8; 18] = Deserialize::deserialize(s)?;
|
||||||
|
Ok(TenantShardId::from(id))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
|
||||||
|
where
|
||||||
|
E: serde::de::Error,
|
||||||
|
{
|
||||||
|
TenantShardId::from_str(v).map_err(E::custom)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if deserializer.is_human_readable() {
|
||||||
|
deserializer.deserialize_str(IdVisitor {
|
||||||
|
is_human_readable_deserializer: true,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
deserializer.deserialize_tuple(
|
||||||
|
18,
|
||||||
|
IdVisitor {
|
||||||
|
is_human_readable_deserializer: false,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
use bincode;
|
||||||
|
use utils::{id::TenantId, Hex};
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc";
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn tenant_shard_id_string() -> Result<(), hex::FromHexError> {
|
||||||
|
let example = TenantShardId {
|
||||||
|
tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
|
||||||
|
shard_count: ShardCount(10),
|
||||||
|
shard_number: ShardNumber(7),
|
||||||
|
};
|
||||||
|
|
||||||
|
let encoded = format!("{example}");
|
||||||
|
|
||||||
|
let expected = format!("{EXAMPLE_TENANT_ID}-070a");
|
||||||
|
assert_eq!(&encoded, &expected);
|
||||||
|
|
||||||
|
let decoded = TenantShardId::from_str(&encoded)?;
|
||||||
|
|
||||||
|
assert_eq!(example, decoded);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> {
|
||||||
|
let example = TenantShardId {
|
||||||
|
tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
|
||||||
|
shard_count: ShardCount(10),
|
||||||
|
shard_number: ShardNumber(7),
|
||||||
|
};
|
||||||
|
|
||||||
|
let encoded = bincode::serialize(&example).unwrap();
|
||||||
|
let expected: [u8; 18] = [
|
||||||
|
0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
|
||||||
|
0xf6, 0xfc, 0x07, 0x0a,
|
||||||
|
];
|
||||||
|
assert_eq!(Hex(&encoded), Hex(&expected));
|
||||||
|
|
||||||
|
let decoded = bincode::deserialize(&encoded).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(example, decoded);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> {
|
||||||
|
// Test that TenantShardId can decode a TenantId in human
|
||||||
|
// readable form
|
||||||
|
let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
|
||||||
|
let encoded = format!("{example}");
|
||||||
|
|
||||||
|
assert_eq!(&encoded, EXAMPLE_TENANT_ID);
|
||||||
|
|
||||||
|
let decoded = TenantShardId::from_str(&encoded)?;
|
||||||
|
|
||||||
|
assert_eq!(example, decoded.tenant_id);
|
||||||
|
assert_eq!(decoded.shard_count, ShardCount(0));
|
||||||
|
assert_eq!(decoded.shard_number, ShardNumber(0));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> {
|
||||||
|
// Test that a legacy TenantShardId encodes into a form that
|
||||||
|
// can be decoded as TenantId
|
||||||
|
let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
|
||||||
|
let example = TenantShardId::unsharded(example_tenant_id);
|
||||||
|
let encoded = format!("{example}");
|
||||||
|
|
||||||
|
assert_eq!(&encoded, EXAMPLE_TENANT_ID);
|
||||||
|
|
||||||
|
let decoded = TenantId::from_str(&encoded)?;
|
||||||
|
|
||||||
|
assert_eq!(example_tenant_id, decoded);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> {
|
||||||
|
// Unlike in human readable encoding, binary encoding does not
|
||||||
|
// do any special handling of legacy unsharded TenantIds: this test
|
||||||
|
// is equivalent to the main test for binary encoding, just verifying
|
||||||
|
// that the same behavior applies when we have used `unsharded()` to
|
||||||
|
// construct a TenantShardId.
|
||||||
|
let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap());
|
||||||
|
let encoded = bincode::serialize(&example).unwrap();
|
||||||
|
|
||||||
|
let expected: [u8; 18] = [
|
||||||
|
0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
|
||||||
|
0xf6, 0xfc, 0x00, 0x00,
|
||||||
|
];
|
||||||
|
assert_eq!(Hex(&encoded), Hex(&expected));
|
||||||
|
|
||||||
|
let decoded = bincode::deserialize::<TenantShardId>(&encoded).unwrap();
|
||||||
|
assert_eq!(example, decoded);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -16,6 +16,7 @@ use pageserver_api::models::{
|
|||||||
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
|
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
|
||||||
TenantLoadRequest, TenantLocationConfigRequest,
|
TenantLoadRequest, TenantLocationConfigRequest,
|
||||||
};
|
};
|
||||||
|
use pageserver_api::shard::TenantShardId;
|
||||||
use remote_storage::GenericRemoteStorage;
|
use remote_storage::GenericRemoteStorage;
|
||||||
use tenant_size_model::{SizeResult, StorageModel};
|
use tenant_size_model::{SizeResult, StorageModel};
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
@@ -419,9 +420,9 @@ async fn timeline_create_handler(
|
|||||||
mut request: Request<Body>,
|
mut request: Request<Body>,
|
||||||
_cancel: CancellationToken,
|
_cancel: CancellationToken,
|
||||||
) -> Result<Response<Body>, ApiError> {
|
) -> Result<Response<Body>, ApiError> {
|
||||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||||
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
|
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
|
||||||
check_permission(&request, Some(tenant_id))?;
|
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||||
|
|
||||||
let new_timeline_id = request_data.new_timeline_id;
|
let new_timeline_id = request_data.new_timeline_id;
|
||||||
|
|
||||||
@@ -430,7 +431,7 @@ async fn timeline_create_handler(
|
|||||||
let state = get_state(&request);
|
let state = get_state(&request);
|
||||||
|
|
||||||
async {
|
async {
|
||||||
let tenant = mgr::get_tenant(tenant_id, true)?;
|
let tenant = state.tenant_manager.get_attached_tenant_shard(tenant_shard_id, true)?;
|
||||||
match tenant.create_timeline(
|
match tenant.create_timeline(
|
||||||
new_timeline_id,
|
new_timeline_id,
|
||||||
request_data.ancestor_timeline_id.map(TimelineId::from),
|
request_data.ancestor_timeline_id.map(TimelineId::from),
|
||||||
@@ -464,7 +465,10 @@ async fn timeline_create_handler(
|
|||||||
Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
|
Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.instrument(info_span!("timeline_create", %tenant_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
|
.instrument(info_span!("timeline_create",
|
||||||
|
tenant_id = %tenant_shard_id.tenant_id,
|
||||||
|
shard = %tenant_shard_id.shard_slug(),
|
||||||
|
timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -660,14 +664,15 @@ async fn timeline_delete_handler(
|
|||||||
request: Request<Body>,
|
request: Request<Body>,
|
||||||
_cancel: CancellationToken,
|
_cancel: CancellationToken,
|
||||||
) -> Result<Response<Body>, ApiError> {
|
) -> Result<Response<Body>, ApiError> {
|
||||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||||
check_permission(&request, Some(tenant_id))?;
|
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||||
|
|
||||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||||
|
let state = get_state(&request);
|
||||||
|
|
||||||
mgr::delete_timeline(tenant_id, timeline_id, &ctx)
|
state.tenant_manager.delete_timeline(tenant_shard_id, timeline_id, &ctx)
|
||||||
.instrument(info_span!("timeline_delete", %tenant_id, %timeline_id))
|
.instrument(info_span!("timeline_delete", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug(), %timeline_id))
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
json_response(StatusCode::ACCEPTED, ())
|
json_response(StatusCode::ACCEPTED, ())
|
||||||
@@ -681,11 +686,14 @@ async fn tenant_detach_handler(
|
|||||||
check_permission(&request, Some(tenant_id))?;
|
check_permission(&request, Some(tenant_id))?;
|
||||||
let detach_ignored: Option<bool> = parse_query_param(&request, "detach_ignored")?;
|
let detach_ignored: Option<bool> = parse_query_param(&request, "detach_ignored")?;
|
||||||
|
|
||||||
|
// This is a legacy API (`/location_conf` is the replacement). It only supports unsharded tenants
|
||||||
|
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||||
|
|
||||||
let state = get_state(&request);
|
let state = get_state(&request);
|
||||||
let conf = state.conf;
|
let conf = state.conf;
|
||||||
mgr::detach_tenant(
|
mgr::detach_tenant(
|
||||||
conf,
|
conf,
|
||||||
tenant_id,
|
tenant_shard_id,
|
||||||
detach_ignored.unwrap_or(false),
|
detach_ignored.unwrap_or(false),
|
||||||
&state.deletion_queue_client,
|
&state.deletion_queue_client,
|
||||||
)
|
)
|
||||||
@@ -802,13 +810,16 @@ async fn tenant_delete_handler(
|
|||||||
_cancel: CancellationToken,
|
_cancel: CancellationToken,
|
||||||
) -> Result<Response<Body>, ApiError> {
|
) -> Result<Response<Body>, ApiError> {
|
||||||
// TODO openapi spec
|
// TODO openapi spec
|
||||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||||
check_permission(&request, Some(tenant_id))?;
|
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||||
|
|
||||||
let state = get_state(&request);
|
let state = get_state(&request);
|
||||||
|
|
||||||
mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_id)
|
mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_shard_id)
|
||||||
.instrument(info_span!("tenant_delete_handler", %tenant_id))
|
.instrument(info_span!("tenant_delete_handler",
|
||||||
|
tenant_id = %tenant_shard_id.tenant_id,
|
||||||
|
shard = tenant_shard_id.shard_slug()
|
||||||
|
))
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
json_response(StatusCode::ACCEPTED, ())
|
json_response(StatusCode::ACCEPTED, ())
|
||||||
@@ -1138,9 +1149,10 @@ async fn put_tenant_location_config_handler(
|
|||||||
mut request: Request<Body>,
|
mut request: Request<Body>,
|
||||||
_cancel: CancellationToken,
|
_cancel: CancellationToken,
|
||||||
) -> Result<Response<Body>, ApiError> {
|
) -> Result<Response<Body>, ApiError> {
|
||||||
|
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||||
|
|
||||||
let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
|
let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
|
||||||
let tenant_id = request_data.tenant_id;
|
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||||
check_permission(&request, Some(tenant_id))?;
|
|
||||||
|
|
||||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||||
let state = get_state(&request);
|
let state = get_state(&request);
|
||||||
@@ -1149,9 +1161,13 @@ async fn put_tenant_location_config_handler(
|
|||||||
// The `Detached` state is special, it doesn't upsert a tenant, it removes
|
// The `Detached` state is special, it doesn't upsert a tenant, it removes
|
||||||
// its local disk content and drops it from memory.
|
// its local disk content and drops it from memory.
|
||||||
if let LocationConfigMode::Detached = request_data.config.mode {
|
if let LocationConfigMode::Detached = request_data.config.mode {
|
||||||
if let Err(e) = mgr::detach_tenant(conf, tenant_id, true, &state.deletion_queue_client)
|
if let Err(e) =
|
||||||
.instrument(info_span!("tenant_detach", %tenant_id))
|
mgr::detach_tenant(conf, tenant_shard_id, true, &state.deletion_queue_client)
|
||||||
.await
|
.instrument(info_span!("tenant_detach",
|
||||||
|
tenant_id = %tenant_shard_id.tenant_id,
|
||||||
|
shard = tenant_shard_id.shard_slug()
|
||||||
|
))
|
||||||
|
.await
|
||||||
{
|
{
|
||||||
match e {
|
match e {
|
||||||
TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
|
TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
|
||||||
@@ -1168,7 +1184,7 @@ async fn put_tenant_location_config_handler(
|
|||||||
|
|
||||||
state
|
state
|
||||||
.tenant_manager
|
.tenant_manager
|
||||||
.upsert_location(tenant_id, location_conf, &ctx)
|
.upsert_location(tenant_shard_id, location_conf, &ctx)
|
||||||
.await
|
.await
|
||||||
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
|
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
|
||||||
// principle we might have hit something like concurrent API calls to the same tenant,
|
// principle we might have hit something like concurrent API calls to the same tenant,
|
||||||
@@ -1752,7 +1768,7 @@ pub fn make_router(
|
|||||||
.get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
|
.get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
|
||||||
.post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
|
.post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
|
||||||
.get("/v1/tenant/:tenant_id", |r| api_handler(r, tenant_status))
|
.get("/v1/tenant/:tenant_id", |r| api_handler(r, tenant_status))
|
||||||
.delete("/v1/tenant/:tenant_id", |r| {
|
.delete("/v1/tenant/:tenant_shard_id", |r| {
|
||||||
api_handler(r, tenant_delete_handler)
|
api_handler(r, tenant_delete_handler)
|
||||||
})
|
})
|
||||||
.get("/v1/tenant/:tenant_id/synthetic_size", |r| {
|
.get("/v1/tenant/:tenant_id/synthetic_size", |r| {
|
||||||
@@ -1764,13 +1780,13 @@ pub fn make_router(
|
|||||||
.get("/v1/tenant/:tenant_id/config", |r| {
|
.get("/v1/tenant/:tenant_id/config", |r| {
|
||||||
api_handler(r, get_tenant_config_handler)
|
api_handler(r, get_tenant_config_handler)
|
||||||
})
|
})
|
||||||
.put("/v1/tenant/:tenant_id/location_config", |r| {
|
.put("/v1/tenant/:tenant_shard_id/location_config", |r| {
|
||||||
api_handler(r, put_tenant_location_config_handler)
|
api_handler(r, put_tenant_location_config_handler)
|
||||||
})
|
})
|
||||||
.get("/v1/tenant/:tenant_id/timeline", |r| {
|
.get("/v1/tenant/:tenant_id/timeline", |r| {
|
||||||
api_handler(r, timeline_list_handler)
|
api_handler(r, timeline_list_handler)
|
||||||
})
|
})
|
||||||
.post("/v1/tenant/:tenant_id/timeline", |r| {
|
.post("/v1/tenant/:tenant_shard_id/timeline", |r| {
|
||||||
api_handler(r, timeline_create_handler)
|
api_handler(r, timeline_create_handler)
|
||||||
})
|
})
|
||||||
.post("/v1/tenant/:tenant_id/attach", |r| {
|
.post("/v1/tenant/:tenant_id/attach", |r| {
|
||||||
@@ -1814,7 +1830,7 @@ pub fn make_router(
|
|||||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
|
"/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
|
||||||
|r| api_handler(r, timeline_download_remote_layers_handler_get),
|
|r| api_handler(r, timeline_download_remote_layers_handler_get),
|
||||||
)
|
)
|
||||||
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
|
.delete("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| {
|
||||||
api_handler(r, timeline_delete_handler)
|
api_handler(r, timeline_delete_handler)
|
||||||
})
|
})
|
||||||
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/layer", |r| {
|
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/layer", |r| {
|
||||||
|
|||||||
@@ -1,106 +1,11 @@
|
|||||||
use crate::walrecord::NeonWalRecord;
|
use crate::walrecord::NeonWalRecord;
|
||||||
use anyhow::{bail, Result};
|
use anyhow::Result;
|
||||||
use byteorder::{ByteOrder, BE};
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::fmt;
|
|
||||||
use std::ops::{AddAssign, Range};
|
use std::ops::{AddAssign, Range};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
/// Key used in the Repository kv-store.
|
pub use pageserver_api::key::{Key, KEY_SIZE};
|
||||||
///
|
|
||||||
/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs
|
|
||||||
/// for what we actually store in these fields.
|
|
||||||
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
|
|
||||||
pub struct Key {
|
|
||||||
pub field1: u8,
|
|
||||||
pub field2: u32,
|
|
||||||
pub field3: u32,
|
|
||||||
pub field4: u32,
|
|
||||||
pub field5: u8,
|
|
||||||
pub field6: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const KEY_SIZE: usize = 18;
|
|
||||||
|
|
||||||
impl Key {
|
|
||||||
/// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish.
|
|
||||||
/// As long as Neon does not support tablespace (because of lack of access to local file system),
|
|
||||||
/// we can assume that only some predefined namespace OIDs are used which can fit in u16
|
|
||||||
pub fn to_i128(&self) -> i128 {
|
|
||||||
assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222);
|
|
||||||
(((self.field1 & 0xf) as i128) << 120)
|
|
||||||
| (((self.field2 & 0xFFFF) as i128) << 104)
|
|
||||||
| ((self.field3 as i128) << 72)
|
|
||||||
| ((self.field4 as i128) << 40)
|
|
||||||
| ((self.field5 as i128) << 32)
|
|
||||||
| self.field6 as i128
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const fn from_i128(x: i128) -> Self {
|
|
||||||
Key {
|
|
||||||
field1: ((x >> 120) & 0xf) as u8,
|
|
||||||
field2: ((x >> 104) & 0xFFFF) as u32,
|
|
||||||
field3: (x >> 72) as u32,
|
|
||||||
field4: (x >> 40) as u32,
|
|
||||||
field5: (x >> 32) as u8,
|
|
||||||
field6: x as u32,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn next(&self) -> Key {
|
|
||||||
self.add(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn add(&self, x: u32) -> Key {
|
|
||||||
let mut key = *self;
|
|
||||||
|
|
||||||
let r = key.field6.overflowing_add(x);
|
|
||||||
key.field6 = r.0;
|
|
||||||
if r.1 {
|
|
||||||
let r = key.field5.overflowing_add(1);
|
|
||||||
key.field5 = r.0;
|
|
||||||
if r.1 {
|
|
||||||
let r = key.field4.overflowing_add(1);
|
|
||||||
key.field4 = r.0;
|
|
||||||
if r.1 {
|
|
||||||
let r = key.field3.overflowing_add(1);
|
|
||||||
key.field3 = r.0;
|
|
||||||
if r.1 {
|
|
||||||
let r = key.field2.overflowing_add(1);
|
|
||||||
key.field2 = r.0;
|
|
||||||
if r.1 {
|
|
||||||
let r = key.field1.overflowing_add(1);
|
|
||||||
key.field1 = r.0;
|
|
||||||
assert!(!r.1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
key
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn from_slice(b: &[u8]) -> Self {
|
|
||||||
Key {
|
|
||||||
field1: b[0],
|
|
||||||
field2: u32::from_be_bytes(b[1..5].try_into().unwrap()),
|
|
||||||
field3: u32::from_be_bytes(b[5..9].try_into().unwrap()),
|
|
||||||
field4: u32::from_be_bytes(b[9..13].try_into().unwrap()),
|
|
||||||
field5: b[13],
|
|
||||||
field6: u32::from_be_bytes(b[14..18].try_into().unwrap()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn write_to_byte_slice(&self, buf: &mut [u8]) {
|
|
||||||
buf[0] = self.field1;
|
|
||||||
BE::write_u32(&mut buf[1..5], self.field2);
|
|
||||||
BE::write_u32(&mut buf[5..9], self.field3);
|
|
||||||
BE::write_u32(&mut buf[9..13], self.field4);
|
|
||||||
buf[13] = self.field5;
|
|
||||||
BE::write_u32(&mut buf[14..18], self.field6);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn key_range_size(key_range: &Range<Key>) -> u32 {
|
pub fn key_range_size(key_range: &Range<Key>) -> u32 {
|
||||||
let start = key_range.start;
|
let start = key_range.start;
|
||||||
@@ -129,49 +34,6 @@ pub fn singleton_range(key: Key) -> Range<Key> {
|
|||||||
key..key.next()
|
key..key.next()
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Display for Key {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
write!(
|
|
||||||
f,
|
|
||||||
"{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}",
|
|
||||||
self.field1, self.field2, self.field3, self.field4, self.field5, self.field6
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Key {
|
|
||||||
pub const MIN: Key = Key {
|
|
||||||
field1: u8::MIN,
|
|
||||||
field2: u32::MIN,
|
|
||||||
field3: u32::MIN,
|
|
||||||
field4: u32::MIN,
|
|
||||||
field5: u8::MIN,
|
|
||||||
field6: u32::MIN,
|
|
||||||
};
|
|
||||||
pub const MAX: Key = Key {
|
|
||||||
field1: u8::MAX,
|
|
||||||
field2: u32::MAX,
|
|
||||||
field3: u32::MAX,
|
|
||||||
field4: u32::MAX,
|
|
||||||
field5: u8::MAX,
|
|
||||||
field6: u32::MAX,
|
|
||||||
};
|
|
||||||
|
|
||||||
pub fn from_hex(s: &str) -> Result<Self> {
|
|
||||||
if s.len() != 36 {
|
|
||||||
bail!("parse error");
|
|
||||||
}
|
|
||||||
Ok(Key {
|
|
||||||
field1: u8::from_str_radix(&s[0..2], 16)?,
|
|
||||||
field2: u32::from_str_radix(&s[2..10], 16)?,
|
|
||||||
field3: u32::from_str_radix(&s[10..18], 16)?,
|
|
||||||
field4: u32::from_str_radix(&s[18..26], 16)?,
|
|
||||||
field5: u8::from_str_radix(&s[26..28], 16)?,
|
|
||||||
field6: u32::from_str_radix(&s[28..36], 16)?,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A 'value' stored for a one Key.
|
/// A 'value' stored for a one Key.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
#[cfg_attr(test, derive(PartialEq))]
|
#[cfg_attr(test, derive(PartialEq))]
|
||||||
|
|||||||
@@ -2,9 +2,10 @@
|
|||||||
//! page server.
|
//! page server.
|
||||||
|
|
||||||
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
|
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
|
||||||
|
use pageserver_api::shard::TenantShardId;
|
||||||
use rand::{distributions::Alphanumeric, Rng};
|
use rand::{distributions::Alphanumeric, Rng};
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
use std::collections::HashMap;
|
use std::collections::{BTreeMap, HashMap};
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
@@ -30,6 +31,7 @@ use crate::metrics::TENANT_MANAGER as METRICS;
|
|||||||
use crate::task_mgr::{self, TaskKind};
|
use crate::task_mgr::{self, TaskKind};
|
||||||
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
|
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
|
||||||
use crate::tenant::delete::DeleteTenantFlow;
|
use crate::tenant::delete::DeleteTenantFlow;
|
||||||
|
use crate::tenant::span::debug_assert_current_span_has_tenant_id;
|
||||||
use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState};
|
use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState};
|
||||||
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
|
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
|
||||||
|
|
||||||
@@ -87,10 +89,37 @@ pub(crate) enum TenantsMap {
|
|||||||
Initializing,
|
Initializing,
|
||||||
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
|
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
|
||||||
/// New tenants can be added using [`tenant_map_acquire_slot`].
|
/// New tenants can be added using [`tenant_map_acquire_slot`].
|
||||||
Open(HashMap<TenantId, TenantSlot>),
|
Open(BTreeMap<TenantShardId, TenantSlot>),
|
||||||
/// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
|
/// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
|
||||||
/// Existing tenants are still accessible, but no new tenants can be created.
|
/// Existing tenants are still accessible, but no new tenants can be created.
|
||||||
ShuttingDown(HashMap<TenantId, TenantSlot>),
|
ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper for mapping shard-unaware functions to a sharding-aware map
|
||||||
|
/// TODO(sharding): all users of this must be made shard-aware.
|
||||||
|
fn exactly_one_or_none<'a>(
|
||||||
|
map: &'a BTreeMap<TenantShardId, TenantSlot>,
|
||||||
|
tenant_id: &TenantId,
|
||||||
|
) -> Option<(&'a TenantShardId, &'a TenantSlot)> {
|
||||||
|
let mut slots = map.range(TenantShardId::tenant_range(*tenant_id));
|
||||||
|
|
||||||
|
// Retrieve the first two slots in the range: if both are populated, we must panic because the caller
|
||||||
|
// needs a shard-naive view of the world in which only one slot can exist for a TenantId at a time.
|
||||||
|
let slot_a = slots.next();
|
||||||
|
let slot_b = slots.next();
|
||||||
|
match (slot_a, slot_b) {
|
||||||
|
(None, None) => None,
|
||||||
|
(Some(slot), None) => {
|
||||||
|
// Exactly one matching slot
|
||||||
|
Some(slot)
|
||||||
|
}
|
||||||
|
(Some(_slot_a), Some(_slot_b)) => {
|
||||||
|
// Multiple shards for this tenant: cannot handle this yet.
|
||||||
|
// TODO(sharding): callers of get() should be shard-aware.
|
||||||
|
todo!("Attaching multiple shards in teh same tenant to the same pageserver")
|
||||||
|
}
|
||||||
|
(None, Some(_)) => unreachable!(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TenantsMap {
|
impl TenantsMap {
|
||||||
@@ -101,7 +130,8 @@ impl TenantsMap {
|
|||||||
match self {
|
match self {
|
||||||
TenantsMap::Initializing => None,
|
TenantsMap::Initializing => None,
|
||||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
|
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
|
||||||
m.get(tenant_id).and_then(TenantSlot::get_attached)
|
// TODO(sharding): callers of get() should be shard-aware.
|
||||||
|
exactly_one_or_none(m, tenant_id).and_then(|(_, slot)| slot.get_attached())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -109,7 +139,10 @@ impl TenantsMap {
|
|||||||
pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option<TenantSlot> {
|
pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option<TenantSlot> {
|
||||||
match self {
|
match self {
|
||||||
TenantsMap::Initializing => None,
|
TenantsMap::Initializing => None,
|
||||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
|
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
|
||||||
|
let key = exactly_one_or_none(m, tenant_id).map(|(k, _)| *k);
|
||||||
|
key.and_then(|key| m.remove(&key))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -383,7 +416,7 @@ pub async fn init_tenant_mgr(
|
|||||||
init_order: InitializationOrder,
|
init_order: InitializationOrder,
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
) -> anyhow::Result<TenantManager> {
|
) -> anyhow::Result<TenantManager> {
|
||||||
let mut tenants = HashMap::new();
|
let mut tenants = BTreeMap::new();
|
||||||
|
|
||||||
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
|
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
|
||||||
|
|
||||||
@@ -404,7 +437,7 @@ pub async fn init_tenant_mgr(
|
|||||||
warn!(%tenant_id, "Marking tenant broken, failed to {e:#}");
|
warn!(%tenant_id, "Marking tenant broken, failed to {e:#}");
|
||||||
|
|
||||||
tenants.insert(
|
tenants.insert(
|
||||||
tenant_id,
|
TenantShardId::unsharded(tenant_id),
|
||||||
TenantSlot::Attached(Tenant::create_broken_tenant(
|
TenantSlot::Attached(Tenant::create_broken_tenant(
|
||||||
conf,
|
conf,
|
||||||
tenant_id,
|
tenant_id,
|
||||||
@@ -427,7 +460,7 @@ pub async fn init_tenant_mgr(
|
|||||||
// tenants, because they do no remote writes and hence require no
|
// tenants, because they do no remote writes and hence require no
|
||||||
// generation number
|
// generation number
|
||||||
info!(%tenant_id, "Loaded tenant in secondary mode");
|
info!(%tenant_id, "Loaded tenant in secondary mode");
|
||||||
tenants.insert(tenant_id, TenantSlot::Secondary);
|
tenants.insert(TenantShardId::unsharded(tenant_id), TenantSlot::Secondary);
|
||||||
}
|
}
|
||||||
LocationMode::Attached(_) => {
|
LocationMode::Attached(_) => {
|
||||||
// TODO: augment re-attach API to enable the control plane to
|
// TODO: augment re-attach API to enable the control plane to
|
||||||
@@ -470,7 +503,10 @@ pub async fn init_tenant_mgr(
|
|||||||
&ctx,
|
&ctx,
|
||||||
) {
|
) {
|
||||||
Ok(tenant) => {
|
Ok(tenant) => {
|
||||||
tenants.insert(tenant.tenant_id(), TenantSlot::Attached(tenant));
|
tenants.insert(
|
||||||
|
TenantShardId::unsharded(tenant.tenant_id()),
|
||||||
|
TenantSlot::Attached(tenant),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(%tenant_id, "Failed to start tenant: {e:#}");
|
error!(%tenant_id, "Failed to start tenant: {e:#}");
|
||||||
@@ -573,19 +609,19 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
|
|||||||
let mut m = tenants.write().unwrap();
|
let mut m = tenants.write().unwrap();
|
||||||
match &mut *m {
|
match &mut *m {
|
||||||
TenantsMap::Initializing => {
|
TenantsMap::Initializing => {
|
||||||
*m = TenantsMap::ShuttingDown(HashMap::default());
|
*m = TenantsMap::ShuttingDown(BTreeMap::default());
|
||||||
info!("tenants map is empty");
|
info!("tenants map is empty");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
TenantsMap::Open(tenants) => {
|
TenantsMap::Open(tenants) => {
|
||||||
let mut shutdown_state = HashMap::new();
|
let mut shutdown_state = BTreeMap::new();
|
||||||
let mut total_in_progress = 0;
|
let mut total_in_progress = 0;
|
||||||
let mut total_attached = 0;
|
let mut total_attached = 0;
|
||||||
|
|
||||||
for (tenant_id, v) in tenants.drain() {
|
for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
|
||||||
match v {
|
match v {
|
||||||
TenantSlot::Attached(t) => {
|
TenantSlot::Attached(t) => {
|
||||||
shutdown_state.insert(tenant_id, TenantSlot::Attached(t.clone()));
|
shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
|
||||||
join_set.spawn(
|
join_set.spawn(
|
||||||
async move {
|
async move {
|
||||||
let freeze_and_flush = true;
|
let freeze_and_flush = true;
|
||||||
@@ -604,13 +640,13 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
|
|||||||
// going to log too many lines
|
// going to log too many lines
|
||||||
debug!("tenant successfully stopped");
|
debug!("tenant successfully stopped");
|
||||||
}
|
}
|
||||||
.instrument(info_span!("shutdown", %tenant_id)),
|
.instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug())),
|
||||||
);
|
);
|
||||||
|
|
||||||
total_attached += 1;
|
total_attached += 1;
|
||||||
}
|
}
|
||||||
TenantSlot::Secondary => {
|
TenantSlot::Secondary => {
|
||||||
shutdown_state.insert(tenant_id, TenantSlot::Secondary);
|
shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary);
|
||||||
}
|
}
|
||||||
TenantSlot::InProgress(notify) => {
|
TenantSlot::InProgress(notify) => {
|
||||||
// InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
|
// InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
|
||||||
@@ -690,19 +726,22 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
|
|||||||
pub(crate) async fn create_tenant(
|
pub(crate) async fn create_tenant(
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
tenant_conf: TenantConfOpt,
|
tenant_conf: TenantConfOpt,
|
||||||
tenant_id: TenantId,
|
tenant_shard_id: TenantShardId,
|
||||||
generation: Generation,
|
generation: Generation,
|
||||||
resources: TenantSharedResources,
|
resources: TenantSharedResources,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> Result<Arc<Tenant>, TenantMapInsertError> {
|
) -> Result<Arc<Tenant>, TenantMapInsertError> {
|
||||||
let location_conf = LocationConf::attached_single(tenant_conf, generation);
|
let location_conf = LocationConf::attached_single(tenant_conf, generation);
|
||||||
|
|
||||||
let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?;
|
let slot_guard =
|
||||||
let tenant_path = super::create_tenant_files(conf, &location_conf, &tenant_id).await?;
|
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
|
||||||
|
// TODO(sharding): make local paths shard-aware
|
||||||
|
let tenant_path =
|
||||||
|
super::create_tenant_files(conf, &location_conf, &tenant_shard_id.tenant_id).await?;
|
||||||
|
|
||||||
let created_tenant = tenant_spawn(
|
let created_tenant = tenant_spawn(
|
||||||
conf,
|
conf,
|
||||||
tenant_id,
|
tenant_shard_id.tenant_id,
|
||||||
&tenant_path,
|
&tenant_path,
|
||||||
resources,
|
resources,
|
||||||
AttachedTenantConf::try_from(location_conf)?,
|
AttachedTenantConf::try_from(location_conf)?,
|
||||||
@@ -715,11 +754,7 @@ pub(crate) async fn create_tenant(
|
|||||||
// See https://github.com/neondatabase/neon/issues/4233
|
// See https://github.com/neondatabase/neon/issues/4233
|
||||||
|
|
||||||
let created_tenant_id = created_tenant.tenant_id();
|
let created_tenant_id = created_tenant.tenant_id();
|
||||||
if tenant_id != created_tenant_id {
|
debug_assert_eq!(created_tenant_id, tenant_shard_id.tenant_id);
|
||||||
return Err(TenantMapInsertError::Other(anyhow::anyhow!(
|
|
||||||
"loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {created_tenant_id})",
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
slot_guard.upsert(TenantSlot::Attached(created_tenant.clone()))?;
|
slot_guard.upsert(TenantSlot::Attached(created_tenant.clone()))?;
|
||||||
|
|
||||||
@@ -755,21 +790,70 @@ pub(crate) async fn set_new_tenant_config(
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl TenantManager {
|
impl TenantManager {
|
||||||
#[instrument(skip_all, fields(%tenant_id))]
|
/// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or is not fitting to the query.
|
||||||
|
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
|
||||||
|
///
|
||||||
|
/// This method is cancel-safe.
|
||||||
|
pub(crate) fn get_attached_tenant_shard(
|
||||||
|
&self,
|
||||||
|
tenant_shard_id: TenantShardId,
|
||||||
|
active_only: bool,
|
||||||
|
) -> Result<Arc<Tenant>, GetTenantError> {
|
||||||
|
let locked = self.tenants.read().unwrap();
|
||||||
|
|
||||||
|
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
|
||||||
|
|
||||||
|
match peek_slot {
|
||||||
|
Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
|
||||||
|
TenantState::Broken {
|
||||||
|
reason,
|
||||||
|
backtrace: _,
|
||||||
|
} if active_only => Err(GetTenantError::Broken(reason)),
|
||||||
|
TenantState::Active => Ok(Arc::clone(tenant)),
|
||||||
|
_ => {
|
||||||
|
if active_only {
|
||||||
|
Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
|
||||||
|
} else {
|
||||||
|
Ok(Arc::clone(tenant))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Some(TenantSlot::InProgress(_)) => {
|
||||||
|
Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
|
||||||
|
}
|
||||||
|
None | Some(TenantSlot::Secondary) => {
|
||||||
|
Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn delete_timeline(
|
||||||
|
&self,
|
||||||
|
tenant_shard_id: TenantShardId,
|
||||||
|
timeline_id: TimelineId,
|
||||||
|
_ctx: &RequestContext,
|
||||||
|
) -> Result<(), DeleteTimelineError> {
|
||||||
|
let tenant = self.get_attached_tenant_shard(tenant_shard_id, true)?;
|
||||||
|
DeleteTimelineFlow::run(&tenant, timeline_id, false).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) async fn upsert_location(
|
pub(crate) async fn upsert_location(
|
||||||
&self,
|
&self,
|
||||||
tenant_id: TenantId,
|
tenant_shard_id: TenantShardId,
|
||||||
new_location_config: LocationConf,
|
new_location_config: LocationConf,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> Result<(), anyhow::Error> {
|
) -> Result<(), anyhow::Error> {
|
||||||
info!("configuring tenant location {tenant_id} to state {new_location_config:?}");
|
debug_assert_current_span_has_tenant_id();
|
||||||
|
info!("configuring tenant location to state {new_location_config:?}");
|
||||||
|
|
||||||
// Special case fast-path for updates to Tenant: if our upsert is only updating configuration,
|
// Special case fast-path for updates to Tenant: if our upsert is only updating configuration,
|
||||||
// then we do not need to set the slot to InProgress, we can just call into the
|
// then we do not need to set the slot to InProgress, we can just call into the
|
||||||
// existng tenant.
|
// existng tenant.
|
||||||
{
|
{
|
||||||
let locked = self.tenants.read().unwrap();
|
let locked = self.tenants.read().unwrap();
|
||||||
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Write)?;
|
let peek_slot =
|
||||||
|
tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
|
||||||
match (&new_location_config.mode, peek_slot) {
|
match (&new_location_config.mode, peek_slot) {
|
||||||
(LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
|
(LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
|
||||||
if attach_conf.generation == tenant.generation {
|
if attach_conf.generation == tenant.generation {
|
||||||
@@ -800,7 +884,7 @@ impl TenantManager {
|
|||||||
// the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
|
// the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
|
||||||
// the state is ill-defined while we're in transition. Transitions are async, but fast: we do
|
// the state is ill-defined while we're in transition. Transitions are async, but fast: we do
|
||||||
// not do significant I/O, and shutdowns should be prompt via cancellation tokens.
|
// not do significant I/O, and shutdowns should be prompt via cancellation tokens.
|
||||||
let mut slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::Any)?;
|
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
|
||||||
|
|
||||||
if let Some(TenantSlot::Attached(tenant)) = slot_guard.get_old_value() {
|
if let Some(TenantSlot::Attached(tenant)) = slot_guard.get_old_value() {
|
||||||
// The case where we keep a Tenant alive was covered above in the special case
|
// The case where we keep a Tenant alive was covered above in the special case
|
||||||
@@ -831,25 +915,31 @@ impl TenantManager {
|
|||||||
slot_guard.drop_old_value().expect("We just shut it down");
|
slot_guard.drop_old_value().expect("We just shut it down");
|
||||||
}
|
}
|
||||||
|
|
||||||
let tenant_path = self.conf.tenant_path(&tenant_id);
|
// TODO(sharding): make local paths sharding-aware
|
||||||
|
let tenant_path = self.conf.tenant_path(&tenant_shard_id.tenant_id);
|
||||||
|
|
||||||
let new_slot = match &new_location_config.mode {
|
let new_slot = match &new_location_config.mode {
|
||||||
LocationMode::Secondary(_) => {
|
LocationMode::Secondary(_) => {
|
||||||
let tenant_path = self.conf.tenant_path(&tenant_id);
|
|
||||||
// Directory doesn't need to be fsync'd because if we crash it can
|
// Directory doesn't need to be fsync'd because if we crash it can
|
||||||
// safely be recreated next time this tenant location is configured.
|
// safely be recreated next time this tenant location is configured.
|
||||||
unsafe_create_dir_all(&tenant_path)
|
unsafe_create_dir_all(&tenant_path)
|
||||||
.await
|
.await
|
||||||
.with_context(|| format!("Creating {tenant_path}"))?;
|
.with_context(|| format!("Creating {tenant_path}"))?;
|
||||||
|
|
||||||
Tenant::persist_tenant_config(self.conf, &tenant_id, &new_location_config)
|
// TODO(sharding): make local paths sharding-aware
|
||||||
.await
|
Tenant::persist_tenant_config(
|
||||||
.map_err(SetNewTenantConfigError::Persist)?;
|
self.conf,
|
||||||
|
&tenant_shard_id.tenant_id,
|
||||||
|
&new_location_config,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(SetNewTenantConfigError::Persist)?;
|
||||||
|
|
||||||
TenantSlot::Secondary
|
TenantSlot::Secondary
|
||||||
}
|
}
|
||||||
LocationMode::Attached(_attach_config) => {
|
LocationMode::Attached(_attach_config) => {
|
||||||
let timelines_path = self.conf.timelines_path(&tenant_id);
|
// TODO(sharding): make local paths sharding-aware
|
||||||
|
let timelines_path = self.conf.timelines_path(&tenant_shard_id.tenant_id);
|
||||||
|
|
||||||
// Directory doesn't need to be fsync'd because we do not depend on
|
// Directory doesn't need to be fsync'd because we do not depend on
|
||||||
// it to exist after crashes: it may be recreated when tenant is
|
// it to exist after crashes: it may be recreated when tenant is
|
||||||
@@ -858,13 +948,19 @@ impl TenantManager {
|
|||||||
.await
|
.await
|
||||||
.with_context(|| format!("Creating {timelines_path}"))?;
|
.with_context(|| format!("Creating {timelines_path}"))?;
|
||||||
|
|
||||||
Tenant::persist_tenant_config(self.conf, &tenant_id, &new_location_config)
|
// TODO(sharding): make local paths sharding-aware
|
||||||
.await
|
Tenant::persist_tenant_config(
|
||||||
.map_err(SetNewTenantConfigError::Persist)?;
|
self.conf,
|
||||||
|
&tenant_shard_id.tenant_id,
|
||||||
|
&new_location_config,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(SetNewTenantConfigError::Persist)?;
|
||||||
|
|
||||||
|
// TODO(sharding): make spawn sharding-aware
|
||||||
let tenant = tenant_spawn(
|
let tenant = tenant_spawn(
|
||||||
self.conf,
|
self.conf,
|
||||||
tenant_id,
|
tenant_shard_id.tenant_id,
|
||||||
&tenant_path,
|
&tenant_path,
|
||||||
self.resources.clone(),
|
self.resources.clone(),
|
||||||
AttachedTenantConf::try_from(new_location_config)?,
|
AttachedTenantConf::try_from(new_location_config)?,
|
||||||
@@ -910,7 +1006,11 @@ pub(crate) fn get_tenant(
|
|||||||
active_only: bool,
|
active_only: bool,
|
||||||
) -> Result<Arc<Tenant>, GetTenantError> {
|
) -> Result<Arc<Tenant>, GetTenantError> {
|
||||||
let locked = TENANTS.read().unwrap();
|
let locked = TENANTS.read().unwrap();
|
||||||
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read)?;
|
|
||||||
|
// TODO(sharding): make all callers of get_tenant shard-aware
|
||||||
|
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||||
|
|
||||||
|
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
|
||||||
|
|
||||||
match peek_slot {
|
match peek_slot {
|
||||||
Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
|
Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
|
||||||
@@ -970,12 +1070,16 @@ pub(crate) async fn get_active_tenant_with_timeout(
|
|||||||
Tenant(Arc<Tenant>),
|
Tenant(Arc<Tenant>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(sharding): make page service interface sharding-aware (page service should apply ShardIdentity to the key
|
||||||
|
// to decide which shard services the request)
|
||||||
|
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||||
|
|
||||||
let wait_start = Instant::now();
|
let wait_start = Instant::now();
|
||||||
let deadline = wait_start + timeout;
|
let deadline = wait_start + timeout;
|
||||||
|
|
||||||
let wait_for = {
|
let wait_for = {
|
||||||
let locked = TENANTS.read().unwrap();
|
let locked = TENANTS.read().unwrap();
|
||||||
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read)
|
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
|
||||||
.map_err(GetTenantError::MapState)?;
|
.map_err(GetTenantError::MapState)?;
|
||||||
match peek_slot {
|
match peek_slot {
|
||||||
Some(TenantSlot::Attached(tenant)) => {
|
Some(TenantSlot::Attached(tenant)) => {
|
||||||
@@ -1019,8 +1123,9 @@ pub(crate) async fn get_active_tenant_with_timeout(
|
|||||||
})?;
|
})?;
|
||||||
{
|
{
|
||||||
let locked = TENANTS.read().unwrap();
|
let locked = TENANTS.read().unwrap();
|
||||||
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read)
|
let peek_slot =
|
||||||
.map_err(GetTenantError::MapState)?;
|
tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
|
||||||
|
.map_err(GetTenantError::MapState)?;
|
||||||
match peek_slot {
|
match peek_slot {
|
||||||
Some(TenantSlot::Attached(tenant)) => tenant.clone(),
|
Some(TenantSlot::Attached(tenant)) => tenant.clone(),
|
||||||
_ => {
|
_ => {
|
||||||
@@ -1062,7 +1167,7 @@ pub(crate) async fn get_active_tenant_with_timeout(
|
|||||||
pub(crate) async fn delete_tenant(
|
pub(crate) async fn delete_tenant(
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
remote_storage: Option<GenericRemoteStorage>,
|
remote_storage: Option<GenericRemoteStorage>,
|
||||||
tenant_id: TenantId,
|
tenant_shard_id: TenantShardId,
|
||||||
) -> Result<(), DeleteTenantError> {
|
) -> Result<(), DeleteTenantError> {
|
||||||
// We acquire a SlotGuard during this function to protect against concurrent
|
// We acquire a SlotGuard during this function to protect against concurrent
|
||||||
// changes while the ::prepare phase of DeleteTenantFlow executes, but then
|
// changes while the ::prepare phase of DeleteTenantFlow executes, but then
|
||||||
@@ -1075,7 +1180,9 @@ pub(crate) async fn delete_tenant(
|
|||||||
//
|
//
|
||||||
// See https://github.com/neondatabase/neon/issues/5080
|
// See https://github.com/neondatabase/neon/issues/5080
|
||||||
|
|
||||||
let mut slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustExist)?;
|
// TODO(sharding): make delete API sharding-aware
|
||||||
|
let mut slot_guard =
|
||||||
|
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
|
||||||
|
|
||||||
// unwrap is safe because we used MustExist mode when acquiring
|
// unwrap is safe because we used MustExist mode when acquiring
|
||||||
let tenant = match slot_guard.get_old_value().as_ref().unwrap() {
|
let tenant = match slot_guard.get_old_value().as_ref().unwrap() {
|
||||||
@@ -1102,16 +1209,6 @@ pub(crate) enum DeleteTimelineError {
|
|||||||
Timeline(#[from] crate::tenant::DeleteTimelineError),
|
Timeline(#[from] crate::tenant::DeleteTimelineError),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn delete_timeline(
|
|
||||||
tenant_id: TenantId,
|
|
||||||
timeline_id: TimelineId,
|
|
||||||
_ctx: &RequestContext,
|
|
||||||
) -> Result<(), DeleteTimelineError> {
|
|
||||||
let tenant = get_tenant(tenant_id, true)?;
|
|
||||||
DeleteTimelineFlow::run(&tenant, timeline_id, false).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub(crate) enum TenantStateError {
|
pub(crate) enum TenantStateError {
|
||||||
#[error("Tenant {0} is stopping")]
|
#[error("Tenant {0} is stopping")]
|
||||||
@@ -1126,14 +1223,14 @@ pub(crate) enum TenantStateError {
|
|||||||
|
|
||||||
pub(crate) async fn detach_tenant(
|
pub(crate) async fn detach_tenant(
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
tenant_id: TenantId,
|
tenant_shard_id: TenantShardId,
|
||||||
detach_ignored: bool,
|
detach_ignored: bool,
|
||||||
deletion_queue_client: &DeletionQueueClient,
|
deletion_queue_client: &DeletionQueueClient,
|
||||||
) -> Result<(), TenantStateError> {
|
) -> Result<(), TenantStateError> {
|
||||||
let tmp_path = detach_tenant0(
|
let tmp_path = detach_tenant0(
|
||||||
conf,
|
conf,
|
||||||
&TENANTS,
|
&TENANTS,
|
||||||
tenant_id,
|
tenant_shard_id,
|
||||||
detach_ignored,
|
detach_ignored,
|
||||||
deletion_queue_client,
|
deletion_queue_client,
|
||||||
)
|
)
|
||||||
@@ -1160,19 +1257,24 @@ pub(crate) async fn detach_tenant(
|
|||||||
async fn detach_tenant0(
|
async fn detach_tenant0(
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
tenants: &std::sync::RwLock<TenantsMap>,
|
tenants: &std::sync::RwLock<TenantsMap>,
|
||||||
tenant_id: TenantId,
|
tenant_shard_id: TenantShardId,
|
||||||
detach_ignored: bool,
|
detach_ignored: bool,
|
||||||
deletion_queue_client: &DeletionQueueClient,
|
deletion_queue_client: &DeletionQueueClient,
|
||||||
) -> Result<Utf8PathBuf, TenantStateError> {
|
) -> Result<Utf8PathBuf, TenantStateError> {
|
||||||
let tenant_dir_rename_operation = |tenant_id_to_clean| async move {
|
let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
|
||||||
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
|
// TODO(sharding): make local path helpers shard-aware
|
||||||
|
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean.tenant_id);
|
||||||
safe_rename_tenant_dir(&local_tenant_directory)
|
safe_rename_tenant_dir(&local_tenant_directory)
|
||||||
.await
|
.await
|
||||||
.with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))
|
.with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))
|
||||||
};
|
};
|
||||||
|
|
||||||
let removal_result =
|
let removal_result = remove_tenant_from_memory(
|
||||||
remove_tenant_from_memory(tenants, tenant_id, tenant_dir_rename_operation(tenant_id)).await;
|
tenants,
|
||||||
|
tenant_shard_id,
|
||||||
|
tenant_dir_rename_operation(tenant_shard_id),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
// Flush pending deletions, so that they have a good chance of passing validation
|
// Flush pending deletions, so that they have a good chance of passing validation
|
||||||
// before this tenant is potentially re-attached elsewhere.
|
// before this tenant is potentially re-attached elsewhere.
|
||||||
@@ -1186,12 +1288,15 @@ async fn detach_tenant0(
|
|||||||
Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
|
Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
|
// TODO(sharding): make local paths sharding-aware
|
||||||
|
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id.tenant_id);
|
||||||
if tenant_ignore_mark.exists() {
|
if tenant_ignore_mark.exists() {
|
||||||
info!("Detaching an ignored tenant");
|
info!("Detaching an ignored tenant");
|
||||||
let tmp_path = tenant_dir_rename_operation(tenant_id)
|
let tmp_path = tenant_dir_rename_operation(tenant_shard_id)
|
||||||
.await
|
.await
|
||||||
.with_context(|| format!("Ignored tenant {tenant_id} local directory rename"))?;
|
.with_context(|| {
|
||||||
|
format!("Ignored tenant {tenant_shard_id} local directory rename")
|
||||||
|
})?;
|
||||||
return Ok(tmp_path);
|
return Ok(tmp_path);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1208,7 +1313,11 @@ pub(crate) async fn load_tenant(
|
|||||||
deletion_queue_client: DeletionQueueClient,
|
deletion_queue_client: DeletionQueueClient,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> Result<(), TenantMapInsertError> {
|
) -> Result<(), TenantMapInsertError> {
|
||||||
let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?;
|
// This is a legacy API (replaced by `/location_conf`). It does not support sharding
|
||||||
|
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||||
|
|
||||||
|
let slot_guard =
|
||||||
|
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
|
||||||
let tenant_path = conf.tenant_path(&tenant_id);
|
let tenant_path = conf.tenant_path(&tenant_id);
|
||||||
|
|
||||||
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
|
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
|
||||||
@@ -1261,7 +1370,10 @@ async fn ignore_tenant0(
|
|||||||
tenants: &std::sync::RwLock<TenantsMap>,
|
tenants: &std::sync::RwLock<TenantsMap>,
|
||||||
tenant_id: TenantId,
|
tenant_id: TenantId,
|
||||||
) -> Result<(), TenantStateError> {
|
) -> Result<(), TenantStateError> {
|
||||||
remove_tenant_from_memory(tenants, tenant_id, async {
|
// This is a legacy API (replaced by `/location_conf`). It does not support sharding
|
||||||
|
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||||
|
|
||||||
|
remove_tenant_from_memory(tenants, tenant_shard_id, async {
|
||||||
let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_id);
|
let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_id);
|
||||||
fs::File::create(&ignore_mark_file)
|
fs::File::create(&ignore_mark_file)
|
||||||
.await
|
.await
|
||||||
@@ -1270,7 +1382,7 @@ async fn ignore_tenant0(
|
|||||||
crashsafe::fsync_file_and_parent(&ignore_mark_file)
|
crashsafe::fsync_file_and_parent(&ignore_mark_file)
|
||||||
.context("Failed to fsync ignore mark file")
|
.context("Failed to fsync ignore mark file")
|
||||||
})
|
})
|
||||||
.with_context(|| format!("Failed to crate ignore mark for tenant {tenant_id}"))?;
|
.with_context(|| format!("Failed to crate ignore mark for tenant {tenant_shard_id}"))?;
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
@@ -1293,10 +1405,12 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, Tenan
|
|||||||
};
|
};
|
||||||
Ok(m.iter()
|
Ok(m.iter()
|
||||||
.filter_map(|(id, tenant)| match tenant {
|
.filter_map(|(id, tenant)| match tenant {
|
||||||
TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())),
|
TenantSlot::Attached(tenant) => Some((id, tenant.current_state())),
|
||||||
TenantSlot::Secondary => None,
|
TenantSlot::Secondary => None,
|
||||||
TenantSlot::InProgress(_) => None,
|
TenantSlot::InProgress(_) => None,
|
||||||
})
|
})
|
||||||
|
// TODO(sharding): make callers of this function shard-aware
|
||||||
|
.map(|(k, v)| (k.tenant_id, v))
|
||||||
.collect())
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1312,7 +1426,11 @@ pub(crate) async fn attach_tenant(
|
|||||||
resources: TenantSharedResources,
|
resources: TenantSharedResources,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> Result<(), TenantMapInsertError> {
|
) -> Result<(), TenantMapInsertError> {
|
||||||
let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?;
|
// This is a legacy API (replaced by `/location_conf`). It does not support sharding
|
||||||
|
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||||
|
|
||||||
|
let slot_guard =
|
||||||
|
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
|
||||||
let location_conf = LocationConf::attached_single(tenant_conf, generation);
|
let location_conf = LocationConf::attached_single(tenant_conf, generation);
|
||||||
let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id).await?;
|
let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id).await?;
|
||||||
// TODO: tenant directory remains on disk if we bail out from here on.
|
// TODO: tenant directory remains on disk if we bail out from here on.
|
||||||
@@ -1359,14 +1477,14 @@ pub(crate) enum TenantMapInsertError {
|
|||||||
pub enum TenantSlotError {
|
pub enum TenantSlotError {
|
||||||
/// When acquiring a slot with the expectation that the tenant already exists.
|
/// When acquiring a slot with the expectation that the tenant already exists.
|
||||||
#[error("Tenant {0} not found")]
|
#[error("Tenant {0} not found")]
|
||||||
NotFound(TenantId),
|
NotFound(TenantShardId),
|
||||||
|
|
||||||
/// When acquiring a slot with the expectation that the tenant does not already exist.
|
/// When acquiring a slot with the expectation that the tenant does not already exist.
|
||||||
#[error("tenant {0} already exists, state: {1:?}")]
|
#[error("tenant {0} already exists, state: {1:?}")]
|
||||||
AlreadyExists(TenantId, TenantState),
|
AlreadyExists(TenantShardId, TenantState),
|
||||||
|
|
||||||
#[error("tenant {0} already exists in but is not attached")]
|
#[error("tenant {0} already exists in but is not attached")]
|
||||||
Conflict(TenantId),
|
Conflict(TenantShardId),
|
||||||
|
|
||||||
// Tried to read a slot that is currently being mutated by another administrative
|
// Tried to read a slot that is currently being mutated by another administrative
|
||||||
// operation.
|
// operation.
|
||||||
@@ -1428,7 +1546,7 @@ pub enum TenantMapError {
|
|||||||
/// `drop_old_value`. It is an error to call this without shutting down
|
/// `drop_old_value`. It is an error to call this without shutting down
|
||||||
/// the conents of `old_value`.
|
/// the conents of `old_value`.
|
||||||
pub struct SlotGuard {
|
pub struct SlotGuard {
|
||||||
tenant_id: TenantId,
|
tenant_shard_id: TenantShardId,
|
||||||
old_value: Option<TenantSlot>,
|
old_value: Option<TenantSlot>,
|
||||||
upserted: bool,
|
upserted: bool,
|
||||||
|
|
||||||
@@ -1439,12 +1557,12 @@ pub struct SlotGuard {
|
|||||||
|
|
||||||
impl SlotGuard {
|
impl SlotGuard {
|
||||||
fn new(
|
fn new(
|
||||||
tenant_id: TenantId,
|
tenant_shard_id: TenantShardId,
|
||||||
old_value: Option<TenantSlot>,
|
old_value: Option<TenantSlot>,
|
||||||
completion: utils::completion::Completion,
|
completion: utils::completion::Completion,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
tenant_id,
|
tenant_shard_id,
|
||||||
old_value,
|
old_value,
|
||||||
upserted: false,
|
upserted: false,
|
||||||
_completion: completion,
|
_completion: completion,
|
||||||
@@ -1487,7 +1605,7 @@ impl SlotGuard {
|
|||||||
TenantsMap::Open(m) => m,
|
TenantsMap::Open(m) => m,
|
||||||
};
|
};
|
||||||
|
|
||||||
let replaced = m.insert(self.tenant_id, new_value);
|
let replaced = m.insert(self.tenant_shard_id, new_value);
|
||||||
self.upserted = true;
|
self.upserted = true;
|
||||||
|
|
||||||
METRICS.tenant_slots.set(m.len() as u64);
|
METRICS.tenant_slots.set(m.len() as u64);
|
||||||
@@ -1506,7 +1624,7 @@ impl SlotGuard {
|
|||||||
None => {
|
None => {
|
||||||
METRICS.unexpected_errors.inc();
|
METRICS.unexpected_errors.inc();
|
||||||
error!(
|
error!(
|
||||||
tenant_id = %self.tenant_id,
|
tenant_shard_id = %self.tenant_shard_id,
|
||||||
"Missing InProgress marker during tenant upsert, this is a bug."
|
"Missing InProgress marker during tenant upsert, this is a bug."
|
||||||
);
|
);
|
||||||
Err(TenantSlotUpsertError::InternalError(
|
Err(TenantSlotUpsertError::InternalError(
|
||||||
@@ -1515,7 +1633,7 @@ impl SlotGuard {
|
|||||||
}
|
}
|
||||||
Some(slot) => {
|
Some(slot) => {
|
||||||
METRICS.unexpected_errors.inc();
|
METRICS.unexpected_errors.inc();
|
||||||
error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot);
|
error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot);
|
||||||
Err(TenantSlotUpsertError::InternalError(
|
Err(TenantSlotUpsertError::InternalError(
|
||||||
"Unexpected contents of TenantSlot".into(),
|
"Unexpected contents of TenantSlot".into(),
|
||||||
))
|
))
|
||||||
@@ -1593,12 +1711,12 @@ impl Drop for SlotGuard {
|
|||||||
TenantsMap::Open(m) => m,
|
TenantsMap::Open(m) => m,
|
||||||
};
|
};
|
||||||
|
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::btree_map::Entry;
|
||||||
match m.entry(self.tenant_id) {
|
match m.entry(self.tenant_shard_id) {
|
||||||
Entry::Occupied(mut entry) => {
|
Entry::Occupied(mut entry) => {
|
||||||
if !matches!(entry.get(), TenantSlot::InProgress(_)) {
|
if !matches!(entry.get(), TenantSlot::InProgress(_)) {
|
||||||
METRICS.unexpected_errors.inc();
|
METRICS.unexpected_errors.inc();
|
||||||
error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get());
|
error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.old_value_is_shutdown() {
|
if self.old_value_is_shutdown() {
|
||||||
@@ -1610,7 +1728,7 @@ impl Drop for SlotGuard {
|
|||||||
Entry::Vacant(_) => {
|
Entry::Vacant(_) => {
|
||||||
METRICS.unexpected_errors.inc();
|
METRICS.unexpected_errors.inc();
|
||||||
error!(
|
error!(
|
||||||
tenant_id = %self.tenant_id,
|
tenant_shard_id = %self.tenant_shard_id,
|
||||||
"Missing InProgress marker during SlotGuard drop, this is a bug."
|
"Missing InProgress marker during SlotGuard drop, this is a bug."
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -1629,7 +1747,7 @@ enum TenantSlotPeekMode {
|
|||||||
|
|
||||||
fn tenant_map_peek_slot<'a>(
|
fn tenant_map_peek_slot<'a>(
|
||||||
tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>,
|
tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>,
|
||||||
tenant_id: &TenantId,
|
tenant_shard_id: &TenantShardId,
|
||||||
mode: TenantSlotPeekMode,
|
mode: TenantSlotPeekMode,
|
||||||
) -> Result<Option<&'a TenantSlot>, TenantMapError> {
|
) -> Result<Option<&'a TenantSlot>, TenantMapError> {
|
||||||
let m = match tenants.deref() {
|
let m = match tenants.deref() {
|
||||||
@@ -1643,7 +1761,7 @@ fn tenant_map_peek_slot<'a>(
|
|||||||
TenantsMap::Open(m) => m,
|
TenantsMap::Open(m) => m,
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(m.get(tenant_id))
|
Ok(m.get(tenant_shard_id))
|
||||||
}
|
}
|
||||||
|
|
||||||
enum TenantSlotAcquireMode {
|
enum TenantSlotAcquireMode {
|
||||||
@@ -1656,14 +1774,14 @@ enum TenantSlotAcquireMode {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn tenant_map_acquire_slot(
|
fn tenant_map_acquire_slot(
|
||||||
tenant_id: &TenantId,
|
tenant_shard_id: &TenantShardId,
|
||||||
mode: TenantSlotAcquireMode,
|
mode: TenantSlotAcquireMode,
|
||||||
) -> Result<SlotGuard, TenantSlotError> {
|
) -> Result<SlotGuard, TenantSlotError> {
|
||||||
tenant_map_acquire_slot_impl(tenant_id, &TENANTS, mode)
|
tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn tenant_map_acquire_slot_impl(
|
fn tenant_map_acquire_slot_impl(
|
||||||
tenant_id: &TenantId,
|
tenant_shard_id: &TenantShardId,
|
||||||
tenants: &std::sync::RwLock<TenantsMap>,
|
tenants: &std::sync::RwLock<TenantsMap>,
|
||||||
mode: TenantSlotAcquireMode,
|
mode: TenantSlotAcquireMode,
|
||||||
) -> Result<SlotGuard, TenantSlotError> {
|
) -> Result<SlotGuard, TenantSlotError> {
|
||||||
@@ -1671,7 +1789,7 @@ fn tenant_map_acquire_slot_impl(
|
|||||||
METRICS.tenant_slot_writes.inc();
|
METRICS.tenant_slot_writes.inc();
|
||||||
|
|
||||||
let mut locked = tenants.write().unwrap();
|
let mut locked = tenants.write().unwrap();
|
||||||
let span = tracing::info_span!("acquire_slot", %tenant_id);
|
let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard=tenant_shard_id.shard_slug());
|
||||||
let _guard = span.enter();
|
let _guard = span.enter();
|
||||||
|
|
||||||
let m = match &mut *locked {
|
let m = match &mut *locked {
|
||||||
@@ -1680,19 +1798,21 @@ fn tenant_map_acquire_slot_impl(
|
|||||||
TenantsMap::Open(m) => m,
|
TenantsMap::Open(m) => m,
|
||||||
};
|
};
|
||||||
|
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::btree_map::Entry;
|
||||||
let entry = m.entry(*tenant_id);
|
|
||||||
|
let entry = m.entry(*tenant_shard_id);
|
||||||
|
|
||||||
match entry {
|
match entry {
|
||||||
Entry::Vacant(v) => match mode {
|
Entry::Vacant(v) => match mode {
|
||||||
MustExist => {
|
MustExist => {
|
||||||
tracing::debug!("Vacant && MustExist: return NotFound");
|
tracing::debug!("Vacant && MustExist: return NotFound");
|
||||||
Err(TenantSlotError::NotFound(*tenant_id))
|
Err(TenantSlotError::NotFound(*tenant_shard_id))
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
let (completion, barrier) = utils::completion::channel();
|
let (completion, barrier) = utils::completion::channel();
|
||||||
v.insert(TenantSlot::InProgress(barrier));
|
v.insert(TenantSlot::InProgress(barrier));
|
||||||
tracing::debug!("Vacant, inserted InProgress");
|
tracing::debug!("Vacant, inserted InProgress");
|
||||||
Ok(SlotGuard::new(*tenant_id, None, completion))
|
Ok(SlotGuard::new(*tenant_shard_id, None, completion))
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Entry::Occupied(mut o) => {
|
Entry::Occupied(mut o) => {
|
||||||
@@ -1706,7 +1826,7 @@ fn tenant_map_acquire_slot_impl(
|
|||||||
TenantSlot::Attached(tenant) => {
|
TenantSlot::Attached(tenant) => {
|
||||||
tracing::debug!("Attached && MustNotExist, return AlreadyExists");
|
tracing::debug!("Attached && MustNotExist, return AlreadyExists");
|
||||||
Err(TenantSlotError::AlreadyExists(
|
Err(TenantSlotError::AlreadyExists(
|
||||||
*tenant_id,
|
*tenant_shard_id,
|
||||||
tenant.current_state(),
|
tenant.current_state(),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
@@ -1715,7 +1835,7 @@ fn tenant_map_acquire_slot_impl(
|
|||||||
// to get the state from
|
// to get the state from
|
||||||
tracing::debug!("Occupied & MustNotExist, return AlreadyExists");
|
tracing::debug!("Occupied & MustNotExist, return AlreadyExists");
|
||||||
Err(TenantSlotError::AlreadyExists(
|
Err(TenantSlotError::AlreadyExists(
|
||||||
*tenant_id,
|
*tenant_shard_id,
|
||||||
TenantState::Broken {
|
TenantState::Broken {
|
||||||
reason: "Present but not attached".to_string(),
|
reason: "Present but not attached".to_string(),
|
||||||
backtrace: "".to_string(),
|
backtrace: "".to_string(),
|
||||||
@@ -1728,7 +1848,11 @@ fn tenant_map_acquire_slot_impl(
|
|||||||
let (completion, barrier) = utils::completion::channel();
|
let (completion, barrier) = utils::completion::channel();
|
||||||
let old_value = o.insert(TenantSlot::InProgress(barrier));
|
let old_value = o.insert(TenantSlot::InProgress(barrier));
|
||||||
tracing::debug!("Occupied, replaced with InProgress");
|
tracing::debug!("Occupied, replaced with InProgress");
|
||||||
Ok(SlotGuard::new(*tenant_id, Some(old_value), completion))
|
Ok(SlotGuard::new(
|
||||||
|
*tenant_shard_id,
|
||||||
|
Some(old_value),
|
||||||
|
completion,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1741,7 +1865,7 @@ fn tenant_map_acquire_slot_impl(
|
|||||||
/// operation would be needed to remove it.
|
/// operation would be needed to remove it.
|
||||||
async fn remove_tenant_from_memory<V, F>(
|
async fn remove_tenant_from_memory<V, F>(
|
||||||
tenants: &std::sync::RwLock<TenantsMap>,
|
tenants: &std::sync::RwLock<TenantsMap>,
|
||||||
tenant_id: TenantId,
|
tenant_shard_id: TenantShardId,
|
||||||
tenant_cleanup: F,
|
tenant_cleanup: F,
|
||||||
) -> Result<V, TenantStateError>
|
) -> Result<V, TenantStateError>
|
||||||
where
|
where
|
||||||
@@ -1750,7 +1874,7 @@ where
|
|||||||
use utils::completion;
|
use utils::completion;
|
||||||
|
|
||||||
let mut slot_guard =
|
let mut slot_guard =
|
||||||
tenant_map_acquire_slot_impl(&tenant_id, tenants, TenantSlotAcquireMode::MustExist)?;
|
tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
|
||||||
|
|
||||||
// The SlotGuard allows us to manipulate the Tenant object without fear of some
|
// The SlotGuard allows us to manipulate the Tenant object without fear of some
|
||||||
// concurrent API request doing something else for the same tenant ID.
|
// concurrent API request doing something else for the same tenant ID.
|
||||||
@@ -1777,7 +1901,7 @@ where
|
|||||||
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
|
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
|
||||||
// wait for it but return an error right away because these are distinct requests.
|
// wait for it but return an error right away because these are distinct requests.
|
||||||
slot_guard.revert();
|
slot_guard.revert();
|
||||||
return Err(TenantStateError::IsStopping(tenant_id));
|
return Err(TenantStateError::IsStopping(tenant_shard_id.tenant_id));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1788,7 +1912,7 @@ where
|
|||||||
|
|
||||||
match tenant_cleanup
|
match tenant_cleanup
|
||||||
.await
|
.await
|
||||||
.with_context(|| format!("Failed to run cleanup for tenant {tenant_id}"))
|
.with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
|
||||||
{
|
{
|
||||||
Ok(hook_value) => {
|
Ok(hook_value) => {
|
||||||
// Success: drop the old TenantSlot::Attached.
|
// Success: drop the old TenantSlot::Attached.
|
||||||
@@ -1867,7 +1991,8 @@ pub(crate) async fn immediate_gc(
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::collections::HashMap;
|
use pageserver_api::shard::TenantShardId;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tracing::{info_span, Instrument};
|
use tracing::{info_span, Instrument};
|
||||||
|
|
||||||
@@ -1887,12 +2012,12 @@ mod tests {
|
|||||||
|
|
||||||
// harness loads it to active, which is forced and nothing is running on the tenant
|
// harness loads it to active, which is forced and nothing is running on the tenant
|
||||||
|
|
||||||
let id = t.tenant_id();
|
let id = TenantShardId::unsharded(t.tenant_id());
|
||||||
|
|
||||||
// tenant harness configures the logging and we cannot escape it
|
// tenant harness configures the logging and we cannot escape it
|
||||||
let _e = info_span!("testing", tenant_id = %id).entered();
|
let _e = info_span!("testing", tenant_id = %id).entered();
|
||||||
|
|
||||||
let tenants = HashMap::from([(id, TenantSlot::Attached(t.clone()))]);
|
let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
|
||||||
let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
|
let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
|
||||||
|
|
||||||
// Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
|
// Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
|
||||||
|
|||||||
@@ -1,7 +1,9 @@
|
|||||||
//! User credentials used in authentication.
|
//! User credentials used in authentication.
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
auth::password_hack::parse_endpoint_param, error::UserFacingError, proxy::neon_options,
|
auth::password_hack::parse_endpoint_param,
|
||||||
|
error::UserFacingError,
|
||||||
|
proxy::{neon_options, NUM_CONNECTION_ACCEPTED_BY_SNI},
|
||||||
};
|
};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use pq_proto::StartupMessageParams;
|
use pq_proto::StartupMessageParams;
|
||||||
@@ -124,6 +126,22 @@ impl<'a> ClientCredentials<'a> {
|
|||||||
.transpose()?;
|
.transpose()?;
|
||||||
|
|
||||||
info!(user, project = project.as_deref(), "credentials");
|
info!(user, project = project.as_deref(), "credentials");
|
||||||
|
if sni.is_some() {
|
||||||
|
info!("Connection with sni");
|
||||||
|
NUM_CONNECTION_ACCEPTED_BY_SNI
|
||||||
|
.with_label_values(&["sni"])
|
||||||
|
.inc();
|
||||||
|
} else if project.is_some() {
|
||||||
|
NUM_CONNECTION_ACCEPTED_BY_SNI
|
||||||
|
.with_label_values(&["no_sni"])
|
||||||
|
.inc();
|
||||||
|
info!("Connection without sni");
|
||||||
|
} else {
|
||||||
|
NUM_CONNECTION_ACCEPTED_BY_SNI
|
||||||
|
.with_label_values(&["password_hack"])
|
||||||
|
.inc();
|
||||||
|
info!("Connection with password hack");
|
||||||
|
}
|
||||||
|
|
||||||
let cache_key = format!(
|
let cache_key = format!(
|
||||||
"{}{}",
|
"{}{}",
|
||||||
|
|||||||
@@ -129,6 +129,15 @@ pub static RATE_LIMITER_LIMIT: Lazy<IntGaugeVec> = Lazy::new(|| {
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
});
|
});
|
||||||
|
|
||||||
|
pub static NUM_CONNECTION_ACCEPTED_BY_SNI: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||||
|
register_int_counter_vec!(
|
||||||
|
"proxy_accepted_connections_by_sni",
|
||||||
|
"Number of connections (per sni).",
|
||||||
|
&["kind"],
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
pub struct LatencyTimer {
|
pub struct LatencyTimer {
|
||||||
// time since the stopwatch was started
|
// time since the stopwatch was started
|
||||||
start: Option<Instant>,
|
start: Option<Instant>,
|
||||||
|
|||||||
@@ -210,12 +210,7 @@ impl GlobalConnPool {
|
|||||||
client.session.send(session_id)?;
|
client.session.send(session_id)?;
|
||||||
latency_timer.pool_hit();
|
latency_timer.pool_hit();
|
||||||
latency_timer.success();
|
latency_timer.success();
|
||||||
return Ok(Client {
|
return Ok(Client::new(client, pool).await);
|
||||||
conn_id: client.conn_id,
|
|
||||||
inner: Some(client),
|
|
||||||
span: Span::current(),
|
|
||||||
pool,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let conn_id = uuid::Uuid::new_v4();
|
let conn_id = uuid::Uuid::new_v4();
|
||||||
@@ -263,15 +258,11 @@ impl GlobalConnPool {
|
|||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
new_client.map(|inner| Client {
|
// new_client.map(|inner| Client::new(inner, pool).await)
|
||||||
conn_id: inner.conn_id,
|
Ok(Client::new(new_client?, pool).await)
|
||||||
inner: Some(inner),
|
|
||||||
span: Span::current(),
|
|
||||||
pool,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn put(&self, conn_info: &ConnInfo, client: ClientInner) -> anyhow::Result<()> {
|
fn put(&self, conn_info: &ConnInfo, client: ClientInner, pid: i32) -> anyhow::Result<()> {
|
||||||
let conn_id = client.conn_id;
|
let conn_id = client.conn_id;
|
||||||
|
|
||||||
// We want to hold this open while we return. This ensures that the pool can't close
|
// We want to hold this open while we return. This ensures that the pool can't close
|
||||||
@@ -315,9 +306,9 @@ impl GlobalConnPool {
|
|||||||
|
|
||||||
// do logging outside of the mutex
|
// do logging outside of the mutex
|
||||||
if returned {
|
if returned {
|
||||||
info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
|
info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}, pid={pid}");
|
||||||
} else {
|
} else {
|
||||||
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
|
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}, pid={pid}");
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -528,6 +519,22 @@ struct ClientInner {
|
|||||||
conn_id: uuid::Uuid,
|
conn_id: uuid::Uuid,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ClientInner {
|
||||||
|
pub async fn get_pid(&mut self) -> anyhow::Result<i32> {
|
||||||
|
let rows = self.inner.query("select pg_backend_pid();", &[]).await?;
|
||||||
|
if rows.len() != 1 {
|
||||||
|
Err(anyhow::anyhow!(
|
||||||
|
"expected 1 row from pg_backend_pid(), got {}",
|
||||||
|
rows.len()
|
||||||
|
))
|
||||||
|
} else {
|
||||||
|
let pid = rows[0].get(0);
|
||||||
|
info!(%pid, "got pid");
|
||||||
|
Ok(pid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Client {
|
impl Client {
|
||||||
pub fn metrics(&self) -> Arc<MetricCounter> {
|
pub fn metrics(&self) -> Arc<MetricCounter> {
|
||||||
USAGE_METRICS.register(self.inner.as_ref().unwrap().ids.clone())
|
USAGE_METRICS.register(self.inner.as_ref().unwrap().ids.clone())
|
||||||
@@ -539,6 +546,7 @@ pub struct Client {
|
|||||||
span: Span,
|
span: Span,
|
||||||
inner: Option<ClientInner>,
|
inner: Option<ClientInner>,
|
||||||
pool: Option<(ConnInfo, Arc<GlobalConnPool>)>,
|
pool: Option<(ConnInfo, Arc<GlobalConnPool>)>,
|
||||||
|
pid: i32,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Discard<'a> {
|
pub struct Discard<'a> {
|
||||||
@@ -547,12 +555,25 @@ pub struct Discard<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Client {
|
impl Client {
|
||||||
|
pub(self) async fn new(
|
||||||
|
mut inner: ClientInner,
|
||||||
|
pool: Option<(ConnInfo, Arc<GlobalConnPool>)>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
conn_id: inner.conn_id,
|
||||||
|
pid: inner.get_pid().await.unwrap_or(-1),
|
||||||
|
inner: Some(inner),
|
||||||
|
span: Span::current(),
|
||||||
|
pool,
|
||||||
|
}
|
||||||
|
}
|
||||||
pub fn inner(&mut self) -> (&mut tokio_postgres::Client, Discard<'_>) {
|
pub fn inner(&mut self) -> (&mut tokio_postgres::Client, Discard<'_>) {
|
||||||
let Self {
|
let Self {
|
||||||
inner,
|
inner,
|
||||||
pool,
|
pool,
|
||||||
conn_id,
|
conn_id,
|
||||||
span: _,
|
span: _,
|
||||||
|
pid: _,
|
||||||
} = self;
|
} = self;
|
||||||
(
|
(
|
||||||
&mut inner
|
&mut inner
|
||||||
@@ -609,10 +630,11 @@ impl Drop for Client {
|
|||||||
.expect("client inner should not be removed");
|
.expect("client inner should not be removed");
|
||||||
if let Some((conn_info, conn_pool)) = self.pool.take() {
|
if let Some((conn_info, conn_pool)) = self.pool.take() {
|
||||||
let current_span = self.span.clone();
|
let current_span = self.span.clone();
|
||||||
|
let pid = self.pid;
|
||||||
// return connection to the pool
|
// return connection to the pool
|
||||||
tokio::task::spawn_blocking(move || {
|
tokio::task::spawn_blocking(move || {
|
||||||
let _span = current_span.enter();
|
let _span = current_span.enter();
|
||||||
let _ = conn_pool.put(&conn_info, client);
|
let _ = conn_pool.put(&conn_info, client, pid);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
143
vm-image-spec.yaml
Normal file
143
vm-image-spec.yaml
Normal file
@@ -0,0 +1,143 @@
|
|||||||
|
# Supplemental file for neondatabase/autoscaling's vm-builder, for producing the VM compute image.
|
||||||
|
---
|
||||||
|
commands:
|
||||||
|
- name: cgconfigparser
|
||||||
|
user: root
|
||||||
|
sysvInitAction: sysinit
|
||||||
|
shell: "cgconfigparser -l /etc/cgconfig.conf -s 1664"
|
||||||
|
- name: pgbouncer
|
||||||
|
user: nobody
|
||||||
|
sysvInitAction: respawn
|
||||||
|
shell: "/usr/local/bin/pgbouncer /etc/pgbouncer.ini"
|
||||||
|
- name: postgres-exporter
|
||||||
|
user: nobody
|
||||||
|
sysvInitAction: respawn
|
||||||
|
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres" /bin/postgres_exporter --extend.query-path /etc/postgres_exporter_queries.yml'
|
||||||
|
shutdownHook: |
|
||||||
|
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
|
||||||
|
files:
|
||||||
|
- filename: pgbouncer.ini
|
||||||
|
content: |
|
||||||
|
[databases]
|
||||||
|
*=host=localhost port=5432 auth_user=cloud_admin
|
||||||
|
[pgbouncer]
|
||||||
|
listen_port=6432
|
||||||
|
listen_addr=0.0.0.0
|
||||||
|
auth_type=scram-sha-256
|
||||||
|
auth_user=cloud_admin
|
||||||
|
auth_dbname=postgres
|
||||||
|
client_tls_sslmode=disable
|
||||||
|
server_tls_sslmode=disable
|
||||||
|
pool_mode=transaction
|
||||||
|
max_client_conn=10000
|
||||||
|
default_pool_size=16
|
||||||
|
max_prepared_statements=0
|
||||||
|
- filename: cgconfig.conf
|
||||||
|
content: |
|
||||||
|
# Configuration for cgroups in VM compute nodes
|
||||||
|
group neon-postgres {
|
||||||
|
perm {
|
||||||
|
admin {
|
||||||
|
uid = postgres;
|
||||||
|
}
|
||||||
|
task {
|
||||||
|
gid = users;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
memory {}
|
||||||
|
}
|
||||||
|
- filename: postgres_exporter_queries.yml
|
||||||
|
content: |
|
||||||
|
postgres_exporter_pg_database_size:
|
||||||
|
query: "SELECT pg_database.datname, pg_database_size(pg_database.datname) as bytes, 42 as fourtytwo FROM pg_database"
|
||||||
|
cache_seconds: 30
|
||||||
|
metrics:
|
||||||
|
- datname:
|
||||||
|
usage: "LABEL"
|
||||||
|
description: "Name of the database"
|
||||||
|
- bytes:
|
||||||
|
usage: "GAUGE"
|
||||||
|
description: "Disk space used by the database"
|
||||||
|
- fourtytwo:
|
||||||
|
usage: "GAUGE"
|
||||||
|
description: "fourtytwo"
|
||||||
|
build: |
|
||||||
|
# Build cgroup-tools
|
||||||
|
#
|
||||||
|
# At time of writing (2023-03-14), debian bullseye has a version of cgroup-tools (technically
|
||||||
|
# libcgroup) that doesn't support cgroup v2 (version 0.41-11). Unfortunately, the vm-monitor
|
||||||
|
# requires cgroup v2, so we'll build cgroup-tools ourselves.
|
||||||
|
FROM debian:bullseye-slim as libcgroup-builder
|
||||||
|
ENV LIBCGROUP_VERSION v2.0.3
|
||||||
|
|
||||||
|
RUN set -exu \
|
||||||
|
&& apt update \
|
||||||
|
&& apt install --no-install-recommends -y \
|
||||||
|
git \
|
||||||
|
ca-certificates \
|
||||||
|
automake \
|
||||||
|
cmake \
|
||||||
|
make \
|
||||||
|
gcc \
|
||||||
|
byacc \
|
||||||
|
flex \
|
||||||
|
libtool \
|
||||||
|
libpam0g-dev \
|
||||||
|
&& git clone --depth 1 -b $LIBCGROUP_VERSION https://github.com/libcgroup/libcgroup \
|
||||||
|
&& INSTALL_DIR="/libcgroup-install" \
|
||||||
|
&& mkdir -p "$INSTALL_DIR/bin" "$INSTALL_DIR/include" \
|
||||||
|
&& cd libcgroup \
|
||||||
|
# extracted from bootstrap.sh, with modified flags:
|
||||||
|
&& (test -d m4 || mkdir m4) \
|
||||||
|
&& autoreconf -fi \
|
||||||
|
&& rm -rf autom4te.cache \
|
||||||
|
&& CFLAGS="-O3" ./configure --prefix="$INSTALL_DIR" --sysconfdir=/etc --localstatedir=/var --enable-opaque-hierarchy="name=systemd" \
|
||||||
|
# actually build the thing...
|
||||||
|
&& make install
|
||||||
|
|
||||||
|
FROM quay.io/prometheuscommunity/postgres-exporter:v0.12.0 AS postgres-exporter
|
||||||
|
|
||||||
|
# Build pgbouncer
|
||||||
|
#
|
||||||
|
FROM debian:bullseye-slim AS pgbouncer
|
||||||
|
RUN set -e \
|
||||||
|
&& apt-get update \
|
||||||
|
&& apt-get install -y \
|
||||||
|
curl \
|
||||||
|
build-essential \
|
||||||
|
pkg-config \
|
||||||
|
libevent-dev \
|
||||||
|
libssl-dev
|
||||||
|
|
||||||
|
ENV PGBOUNCER_VERSION 1.21.0
|
||||||
|
ENV PGBOUNCER_GITPATH 1_21_0
|
||||||
|
RUN set -e \
|
||||||
|
&& curl -sfSL https://github.com/pgbouncer/pgbouncer/releases/download/pgbouncer_${PGBOUNCER_GITPATH}/pgbouncer-${PGBOUNCER_VERSION}.tar.gz -o pgbouncer-${PGBOUNCER_VERSION}.tar.gz \
|
||||||
|
&& tar xzvf pgbouncer-${PGBOUNCER_VERSION}.tar.gz \
|
||||||
|
&& cd pgbouncer-${PGBOUNCER_VERSION} \
|
||||||
|
&& LDFLAGS=-static ./configure --prefix=/usr/local/pgbouncer --without-openssl \
|
||||||
|
&& make -j $(nproc) \
|
||||||
|
&& make install
|
||||||
|
merge: |
|
||||||
|
# tweak nofile limits
|
||||||
|
RUN set -e \
|
||||||
|
&& echo 'fs.file-max = 1048576' >>/etc/sysctl.conf \
|
||||||
|
&& test ! -e /etc/security || ( \
|
||||||
|
echo '* - nofile 1048576' >>/etc/security/limits.conf \
|
||||||
|
&& echo 'root - nofile 1048576' >>/etc/security/limits.conf \
|
||||||
|
)
|
||||||
|
|
||||||
|
COPY cgconfig.conf /etc/cgconfig.conf
|
||||||
|
COPY pgbouncer.ini /etc/pgbouncer.ini
|
||||||
|
COPY postgres_exporter_queries.yml /etc/postgres_exporter_queries.yml
|
||||||
|
RUN set -e \
|
||||||
|
&& chown postgres:postgres /etc/pgbouncer.ini \
|
||||||
|
&& chmod 0644 /etc/pgbouncer.ini \
|
||||||
|
&& chmod 0644 /etc/cgconfig.conf \
|
||||||
|
&& chmod 0644 /etc/postgres_exporter_queries.yml
|
||||||
|
|
||||||
|
COPY --from=libcgroup-builder /libcgroup-install/bin/* /usr/bin/
|
||||||
|
COPY --from=libcgroup-builder /libcgroup-install/lib/* /usr/lib/
|
||||||
|
COPY --from=libcgroup-builder /libcgroup-install/sbin/* /usr/sbin/
|
||||||
|
COPY --from=postgres-exporter /bin/postgres_exporter /bin/postgres_exporter
|
||||||
|
COPY --from=pgbouncer /usr/local/pgbouncer/bin/pgbouncer /usr/local/bin/pgbouncer
|
||||||
Reference in New Issue
Block a user