mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 18:32:56 +00:00
Compare commits
1 Commits
precreate_
...
RemoteExte
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
02f8650111 |
5
Cargo.lock
generated
5
Cargo.lock
generated
@@ -4079,7 +4079,6 @@ dependencies = [
|
||||
"clap",
|
||||
"consumption_metrics",
|
||||
"dashmap",
|
||||
"env_logger",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hashbrown 0.13.2",
|
||||
@@ -5740,7 +5739,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-epoll-uring"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#d6a1c93442fb6b3a5bec490204961134e54925dc"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#0e1af4ccddf2f01805cfc9eaefa97ee13c04b52d"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"nix 0.26.4",
|
||||
@@ -6265,7 +6264,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "uring-common"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#d6a1c93442fb6b3a5bec490204961134e54925dc"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#0e1af4ccddf2f01805cfc9eaefa97ee13c04b52d"
|
||||
dependencies = [
|
||||
"io-uring",
|
||||
"libc",
|
||||
|
||||
@@ -100,11 +100,6 @@ RUN mkdir -p /data/.neon/ && chown -R neon:neon /data/.neon/ \
|
||||
-c "listen_pg_addr='0.0.0.0:6400'" \
|
||||
-c "listen_http_addr='0.0.0.0:9898'"
|
||||
|
||||
# When running a binary that links with libpq, default to using our most recent postgres version. Binaries
|
||||
# that want a particular postgres version will select it explicitly: this is just a default.
|
||||
ENV LD_LIBRARY_PATH /usr/local/v16/lib
|
||||
|
||||
|
||||
VOLUME ["/data"]
|
||||
USER neon
|
||||
EXPOSE 6400
|
||||
|
||||
@@ -135,7 +135,7 @@ WORKDIR /home/nonroot
|
||||
|
||||
# Rust
|
||||
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
|
||||
ENV RUSTC_VERSION=1.76.0
|
||||
ENV RUSTC_VERSION=1.75.0
|
||||
ENV RUSTUP_HOME="/home/nonroot/.rustup"
|
||||
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
|
||||
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \
|
||||
|
||||
2
NOTICE
2
NOTICE
@@ -1,5 +1,5 @@
|
||||
Neon
|
||||
Copyright 2022 - 2024 Neon Inc.
|
||||
Copyright 2022 Neon Inc.
|
||||
|
||||
The PostgreSQL submodules in vendor/ are licensed under the PostgreSQL license.
|
||||
See vendor/postgres-vX/COPYRIGHT for details.
|
||||
|
||||
@@ -1235,10 +1235,19 @@ LIMIT 100",
|
||||
|
||||
info!("Downloading to shared preload libraries: {:?}", &libs_vec);
|
||||
|
||||
let build_tag_str = if spec
|
||||
.features
|
||||
.contains(&ComputeFeature::RemoteExtensionsUseLatest)
|
||||
{
|
||||
"latest"
|
||||
} else {
|
||||
&self.build_tag
|
||||
};
|
||||
|
||||
let mut download_tasks = Vec::new();
|
||||
for library in &libs_vec {
|
||||
let (ext_name, ext_path) =
|
||||
remote_extensions.get_ext(library, true, &self.build_tag, &self.pgversion)?;
|
||||
remote_extensions.get_ext(library, true, build_tag_str, &self.pgversion)?;
|
||||
download_tasks.push(self.download_extension(ext_name, ext_path));
|
||||
}
|
||||
let results = join_all(download_tasks).await;
|
||||
|
||||
@@ -8,6 +8,7 @@ use std::thread;
|
||||
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
|
||||
use compute_api::requests::ConfigurationRequest;
|
||||
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
|
||||
use compute_api::spec::ComputeFeature;
|
||||
|
||||
use anyhow::Result;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
@@ -171,12 +172,16 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
}
|
||||
};
|
||||
|
||||
remote_extensions.get_ext(
|
||||
&filename,
|
||||
is_library,
|
||||
&compute.build_tag,
|
||||
&compute.pgversion,
|
||||
)
|
||||
let build_tag_str = if spec
|
||||
.features
|
||||
.contains(&ComputeFeature::RemoteExtensionsUseLatest)
|
||||
{
|
||||
"latest"
|
||||
} else {
|
||||
&compute.build_tag
|
||||
};
|
||||
|
||||
remote_extensions.get_ext(&filename, is_library, build_tag_str, &compute.pgversion)
|
||||
};
|
||||
|
||||
match ext {
|
||||
|
||||
@@ -264,10 +264,9 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
|
||||
// case we miss some events for some reason. Not strictly necessary, but
|
||||
// better safe than sorry.
|
||||
let (tx, rx) = std::sync::mpsc::channel();
|
||||
let watcher_res = notify::recommended_watcher(move |res| {
|
||||
let (mut watcher, rx): (Box<dyn Watcher>, _) = match notify::recommended_watcher(move |res| {
|
||||
let _ = tx.send(res);
|
||||
});
|
||||
let (mut watcher, rx): (Box<dyn Watcher>, _) = match watcher_res {
|
||||
}) {
|
||||
Ok(watcher) => (Box::new(watcher), rx),
|
||||
Err(e) => {
|
||||
match e.kind {
|
||||
|
||||
@@ -7,7 +7,6 @@ CREATE TABLE tenant_shards (
|
||||
generation INTEGER NOT NULL,
|
||||
generation_pageserver BIGINT NOT NULL,
|
||||
placement_policy VARCHAR NOT NULL,
|
||||
splitting SMALLINT NOT NULL,
|
||||
-- config is JSON encoded, opaque to the database.
|
||||
config TEXT NOT NULL
|
||||
);
|
||||
@@ -3,8 +3,7 @@ use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
|
||||
use hyper::{Body, Request, Response};
|
||||
use hyper::{StatusCode, Uri};
|
||||
use pageserver_api::models::{
|
||||
TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
|
||||
TimelineCreateRequest,
|
||||
TenantCreateRequest, TenantLocationConfigRequest, TimelineCreateRequest,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api;
|
||||
@@ -42,7 +41,7 @@ pub struct HttpState {
|
||||
|
||||
impl HttpState {
|
||||
pub fn new(service: Arc<crate::service::Service>, auth: Option<Arc<SwappableJwtAuth>>) -> Self {
|
||||
let allowlist_routes = ["/status", "/ready", "/metrics"]
|
||||
let allowlist_routes = ["/status"]
|
||||
.iter()
|
||||
.map(|v| v.parse().unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
@@ -293,19 +292,6 @@ async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>,
|
||||
json_response(StatusCode::OK, state.service.node_configure(config_req)?)
|
||||
}
|
||||
|
||||
async fn handle_tenant_shard_split(
|
||||
service: Arc<Service>,
|
||||
mut req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
let split_req = json_request::<TenantShardSplitRequest>(&mut req).await?;
|
||||
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
service.tenant_shard_split(tenant_id, split_req).await?,
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_tenant_shard_migrate(
|
||||
service: Arc<Service>,
|
||||
mut req: Request<Body>,
|
||||
@@ -325,17 +311,6 @@ async fn handle_status(_req: Request<Body>) -> Result<Response<Body>, ApiError>
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
|
||||
/// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
|
||||
async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let state = get_state(&req);
|
||||
if state.service.startup_complete.is_ready() {
|
||||
json_response(StatusCode::OK, ())
|
||||
} else {
|
||||
json_response(StatusCode::SERVICE_UNAVAILABLE, ())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ReconcileError> for ApiError {
|
||||
fn from(value: ReconcileError) -> Self {
|
||||
ApiError::Conflict(format!("Reconciliation error: {}", value))
|
||||
@@ -391,7 +366,6 @@ pub fn make_router(
|
||||
.data(Arc::new(HttpState::new(service, auth)))
|
||||
// Non-prefixed generic endpoints (status, metrics)
|
||||
.get("/status", |r| request_span(r, handle_status))
|
||||
.get("/ready", |r| request_span(r, handle_ready))
|
||||
// Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
|
||||
.post("/upcall/v1/re-attach", |r| {
|
||||
request_span(r, handle_re_attach)
|
||||
@@ -417,9 +391,6 @@ pub fn make_router(
|
||||
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
|
||||
tenant_service_handler(r, handle_tenant_shard_migrate)
|
||||
})
|
||||
.put("/control/v1/tenant/:tenant_id/shard_split", |r| {
|
||||
tenant_service_handler(r, handle_tenant_shard_split)
|
||||
})
|
||||
// Tenant operations
|
||||
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
|
||||
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
pub(crate) mod split_state;
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use self::split_state::SplitState;
|
||||
use camino::Utf8Path;
|
||||
use camino::Utf8PathBuf;
|
||||
use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
|
||||
@@ -365,101 +363,19 @@ impl Persistence {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// When we start shard splitting, we must durably mark the tenant so that
|
||||
// on restart, we know that we must go through recovery.
|
||||
//
|
||||
// We create the child shards here, so that they will be available for increment_generation calls
|
||||
// if some pageserver holding a child shard needs to restart before the overall tenant split is complete.
|
||||
// TODO: when we start shard splitting, we must durably mark the tenant so that
|
||||
// on restart, we know that we must go through recovery (list shards that exist
|
||||
// and pick up where we left off and/or revert to parent shards).
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn begin_shard_split(
|
||||
&self,
|
||||
old_shard_count: ShardCount,
|
||||
split_tenant_id: TenantId,
|
||||
parent_to_children: Vec<(TenantShardId, Vec<TenantShardPersistence>)>,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
self.with_conn(move |conn| -> DatabaseResult<()> {
|
||||
conn.transaction(|conn| -> DatabaseResult<()> {
|
||||
// Mark parent shards as splitting
|
||||
let updated = diesel::update(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.filter(shard_count.eq(old_shard_count.0 as i32))
|
||||
.set((splitting.eq(1),))
|
||||
.execute(conn)?;
|
||||
if ShardCount(updated.try_into().map_err(|_| DatabaseError::Logical(format!("Overflow existing shard count {} while splitting", updated)))?) != old_shard_count {
|
||||
// Perhaps a deletion or another split raced with this attempt to split, mutating
|
||||
// the parent shards that we intend to split. In this case the split request should fail.
|
||||
return Err(DatabaseError::Logical(
|
||||
format!("Unexpected existing shard count {updated} when preparing tenant for split (expected {old_shard_count:?})")
|
||||
));
|
||||
}
|
||||
|
||||
// FIXME: spurious clone to sidestep closure move rules
|
||||
let parent_to_children = parent_to_children.clone();
|
||||
|
||||
// Insert child shards
|
||||
for (parent_shard_id, children) in parent_to_children {
|
||||
let mut parent = crate::schema::tenant_shards::table
|
||||
.filter(tenant_id.eq(parent_shard_id.tenant_id.to_string()))
|
||||
.filter(shard_number.eq(parent_shard_id.shard_number.0 as i32))
|
||||
.filter(shard_count.eq(parent_shard_id.shard_count.0 as i32))
|
||||
.load::<TenantShardPersistence>(conn)?;
|
||||
let parent = if parent.len() != 1 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"Parent shard {parent_shard_id} not found"
|
||||
)));
|
||||
} else {
|
||||
parent.pop().unwrap()
|
||||
};
|
||||
for mut shard in children {
|
||||
// Carry the parent's generation into the child
|
||||
shard.generation = parent.generation;
|
||||
|
||||
debug_assert!(shard.splitting == SplitState::Splitting);
|
||||
diesel::insert_into(tenant_shards)
|
||||
.values(shard)
|
||||
.execute(conn)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
pub(crate) async fn begin_shard_split(&self, _tenant_id: TenantId) -> anyhow::Result<()> {
|
||||
todo!();
|
||||
}
|
||||
|
||||
// When we finish shard splitting, we must atomically clean up the old shards
|
||||
// TODO: when we finish shard splitting, we must atomically clean up the old shards
|
||||
// and insert the new shards, and clear the splitting marker.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn complete_shard_split(
|
||||
&self,
|
||||
split_tenant_id: TenantId,
|
||||
old_shard_count: ShardCount,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
self.with_conn(move |conn| -> DatabaseResult<()> {
|
||||
conn.transaction(|conn| -> QueryResult<()> {
|
||||
// Drop parent shards
|
||||
diesel::delete(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.filter(shard_count.eq(old_shard_count.0 as i32))
|
||||
.execute(conn)?;
|
||||
|
||||
// Clear sharding flag
|
||||
let updated = diesel::update(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.set((splitting.eq(0),))
|
||||
.execute(conn)?;
|
||||
debug_assert!(updated > 0);
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
pub(crate) async fn complete_shard_split(&self, _tenant_id: TenantId) -> anyhow::Result<()> {
|
||||
todo!();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -487,8 +403,6 @@ pub(crate) struct TenantShardPersistence {
|
||||
#[serde(default)]
|
||||
pub(crate) placement_policy: String,
|
||||
#[serde(default)]
|
||||
pub(crate) splitting: SplitState,
|
||||
#[serde(default)]
|
||||
pub(crate) config: String,
|
||||
}
|
||||
|
||||
|
||||
@@ -1,46 +0,0 @@
|
||||
use diesel::pg::{Pg, PgValue};
|
||||
use diesel::{
|
||||
deserialize::FromSql, deserialize::FromSqlRow, expression::AsExpression, serialize::ToSql,
|
||||
sql_types::Int2,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, FromSqlRow, AsExpression)]
|
||||
#[diesel(sql_type = SplitStateSQLRepr)]
|
||||
#[derive(Deserialize, Serialize)]
|
||||
pub enum SplitState {
|
||||
Idle = 0,
|
||||
Splitting = 1,
|
||||
}
|
||||
|
||||
impl Default for SplitState {
|
||||
fn default() -> Self {
|
||||
Self::Idle
|
||||
}
|
||||
}
|
||||
|
||||
type SplitStateSQLRepr = Int2;
|
||||
|
||||
impl ToSql<SplitStateSQLRepr, Pg> for SplitState {
|
||||
fn to_sql<'a>(
|
||||
&'a self,
|
||||
out: &'a mut diesel::serialize::Output<Pg>,
|
||||
) -> diesel::serialize::Result {
|
||||
let raw_value: i16 = *self as i16;
|
||||
let mut new_out = out.reborrow();
|
||||
ToSql::<SplitStateSQLRepr, Pg>::to_sql(&raw_value, &mut new_out)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromSql<SplitStateSQLRepr, Pg> for SplitState {
|
||||
fn from_sql(pg_value: PgValue) -> diesel::deserialize::Result<Self> {
|
||||
match FromSql::<SplitStateSQLRepr, Pg>::from_sql(pg_value).map(|v| match v {
|
||||
0 => Some(Self::Idle),
|
||||
1 => Some(Self::Splitting),
|
||||
_ => None,
|
||||
})? {
|
||||
Some(v) => Ok(v),
|
||||
None => Err(format!("Invalid SplitState value, was: {:?}", pg_value.as_bytes()).into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -20,7 +20,6 @@ diesel::table! {
|
||||
generation -> Int4,
|
||||
generation_pageserver -> Int8,
|
||||
placement_policy -> Varchar,
|
||||
splitting -> Int2,
|
||||
config -> Text,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
collections::{BTreeMap, HashMap},
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
@@ -24,14 +23,13 @@ use pageserver_api::{
|
||||
models::{
|
||||
LocationConfig, LocationConfigMode, ShardParameters, TenantConfig, TenantCreateRequest,
|
||||
TenantLocationConfigRequest, TenantLocationConfigResponse, TenantShardLocation,
|
||||
TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
|
||||
TimelineCreateRequest, TimelineInfo,
|
||||
},
|
||||
shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId},
|
||||
};
|
||||
use pageserver_client::mgmt_api;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{
|
||||
backoff,
|
||||
completion::Barrier,
|
||||
generation::Generation,
|
||||
http::error::ApiError,
|
||||
@@ -42,11 +40,7 @@ use utils::{
|
||||
use crate::{
|
||||
compute_hook::{self, ComputeHook},
|
||||
node::Node,
|
||||
persistence::{
|
||||
split_state::SplitState, DatabaseError, NodePersistence, Persistence,
|
||||
TenantShardPersistence,
|
||||
},
|
||||
reconciler::attached_location_conf,
|
||||
persistence::{DatabaseError, NodePersistence, Persistence, TenantShardPersistence},
|
||||
scheduler::Scheduler,
|
||||
tenant_state::{
|
||||
IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError,
|
||||
@@ -151,71 +145,31 @@ impl Service {
|
||||
// indeterminate, same as in [`ObservedStateLocation`])
|
||||
let mut observed = HashMap::new();
|
||||
|
||||
let mut nodes_online = HashSet::new();
|
||||
|
||||
// TODO: give Service a cancellation token for clean shutdown
|
||||
let cancel = CancellationToken::new();
|
||||
let nodes = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked.nodes.clone()
|
||||
};
|
||||
|
||||
// TODO: issue these requests concurrently
|
||||
{
|
||||
let nodes = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked.nodes.clone()
|
||||
};
|
||||
for node in nodes.values() {
|
||||
let http_client = reqwest::ClientBuilder::new()
|
||||
.timeout(Duration::from_secs(5))
|
||||
.build()
|
||||
.expect("Failed to construct HTTP client");
|
||||
let client = mgmt_api::Client::from_client(
|
||||
http_client,
|
||||
node.base_url(),
|
||||
self.config.jwt_token.as_deref(),
|
||||
);
|
||||
for node in nodes.values() {
|
||||
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
|
||||
|
||||
fn is_fatal(e: &mgmt_api::Error) -> bool {
|
||||
use mgmt_api::Error::*;
|
||||
match e {
|
||||
ReceiveBody(_) | ReceiveErrorBody(_) => false,
|
||||
ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
|
||||
| ApiError(StatusCode::GATEWAY_TIMEOUT, _)
|
||||
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
|
||||
ApiError(_, _) => true,
|
||||
}
|
||||
tracing::info!("Scanning shards on node {}...", node.id);
|
||||
match client.list_location_config().await {
|
||||
Err(e) => {
|
||||
tracing::warn!("Could not contact pageserver {} ({e})", node.id);
|
||||
// TODO: be more tolerant, apply a generous 5-10 second timeout with retries, in case
|
||||
// pageserver is being restarted at the same time as we are
|
||||
}
|
||||
Ok(listing) => {
|
||||
tracing::info!(
|
||||
"Received {} shard statuses from pageserver {}, setting it to Active",
|
||||
listing.tenant_shards.len(),
|
||||
node.id
|
||||
);
|
||||
|
||||
let list_response = backoff::retry(
|
||||
|| client.list_location_config(),
|
||||
is_fatal,
|
||||
1,
|
||||
5,
|
||||
"Location config listing",
|
||||
&cancel,
|
||||
)
|
||||
.await;
|
||||
let Some(list_response) = list_response else {
|
||||
tracing::info!("Shutdown during startup_reconcile");
|
||||
return;
|
||||
};
|
||||
|
||||
tracing::info!("Scanning shards on node {}...", node.id);
|
||||
match list_response {
|
||||
Err(e) => {
|
||||
tracing::warn!("Could not contact pageserver {} ({e})", node.id);
|
||||
// TODO: be more tolerant, do some retries, in case
|
||||
// pageserver is being restarted at the same time as we are
|
||||
}
|
||||
Ok(listing) => {
|
||||
tracing::info!(
|
||||
"Received {} shard statuses from pageserver {}, setting it to Active",
|
||||
listing.tenant_shards.len(),
|
||||
node.id
|
||||
);
|
||||
nodes_online.insert(node.id);
|
||||
|
||||
for (tenant_shard_id, conf_opt) in listing.tenant_shards {
|
||||
observed.insert(tenant_shard_id, (node.id, conf_opt));
|
||||
}
|
||||
for (tenant_shard_id, conf_opt) in listing.tenant_shards {
|
||||
observed.insert(tenant_shard_id, (node.id, conf_opt));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -226,19 +180,8 @@ impl Service {
|
||||
let mut compute_notifications = Vec::new();
|
||||
|
||||
// Populate intent and observed states for all tenants, based on reported state on pageservers
|
||||
let (shard_count, nodes) = {
|
||||
let shard_count = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
|
||||
// Mark nodes online if they responded to us: nodes are offline by default after a restart.
|
||||
let mut nodes = (*locked.nodes).clone();
|
||||
for (node_id, node) in nodes.iter_mut() {
|
||||
if nodes_online.contains(node_id) {
|
||||
node.availability = NodeAvailability::Active;
|
||||
}
|
||||
}
|
||||
locked.nodes = Arc::new(nodes);
|
||||
let nodes = locked.nodes.clone();
|
||||
|
||||
for (tenant_shard_id, (node_id, observed_loc)) in observed {
|
||||
let Some(tenant_state) = locked.tenants.get_mut(&tenant_shard_id) else {
|
||||
cleanup.push((tenant_shard_id, node_id));
|
||||
@@ -270,7 +213,7 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
(locked.tenants.len(), nodes)
|
||||
locked.tenants.len()
|
||||
};
|
||||
|
||||
// TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that
|
||||
@@ -331,8 +274,9 @@ impl Service {
|
||||
let stream = futures::stream::iter(compute_notifications.into_iter())
|
||||
.map(|(tenant_shard_id, node_id)| {
|
||||
let compute_hook = compute_hook.clone();
|
||||
let cancel = cancel.clone();
|
||||
async move {
|
||||
// TODO: give Service a cancellation token for clean shutdown
|
||||
let cancel = CancellationToken::new();
|
||||
if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await {
|
||||
tracing::error!(
|
||||
tenant_shard_id=%tenant_shard_id,
|
||||
@@ -438,7 +382,7 @@ impl Service {
|
||||
))),
|
||||
config,
|
||||
persistence,
|
||||
startup_complete: startup_complete.clone(),
|
||||
startup_complete,
|
||||
});
|
||||
|
||||
let result_task_this = this.clone();
|
||||
@@ -532,7 +476,6 @@ impl Service {
|
||||
generation_pageserver: i64::MAX,
|
||||
placement_policy: serde_json::to_string(&PlacementPolicy::default()).unwrap(),
|
||||
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
|
||||
splitting: SplitState::default(),
|
||||
};
|
||||
|
||||
match self.persistence.insert_tenant_shards(vec![tsp]).await {
|
||||
@@ -775,7 +718,6 @@ impl Service {
|
||||
generation_pageserver: i64::MAX,
|
||||
placement_policy: serde_json::to_string(&placement_policy).unwrap(),
|
||||
config: serde_json::to_string(&create_req.config).unwrap(),
|
||||
splitting: SplitState::default(),
|
||||
})
|
||||
.collect();
|
||||
self.persistence
|
||||
@@ -1035,10 +977,6 @@ impl Service {
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: if we timeout/fail on reconcile, we should still succeed this request,
|
||||
// because otherwise a broken compute hook causes a feedback loop where
|
||||
// location_config returns 500 and gets retried forever.
|
||||
|
||||
if let Some(create_req) = maybe_create {
|
||||
let create_resp = self.tenant_create(create_req).await?;
|
||||
result.shards = create_resp
|
||||
@@ -1162,7 +1100,6 @@ impl Service {
|
||||
self.ensure_attached_wait(tenant_id).await?;
|
||||
|
||||
// TODO: refuse to do this if shard splitting is in progress
|
||||
// (https://github.com/neondatabase/neon/issues/6676)
|
||||
let targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut targets = Vec::new();
|
||||
@@ -1243,7 +1180,6 @@ impl Service {
|
||||
self.ensure_attached_wait(tenant_id).await?;
|
||||
|
||||
// TODO: refuse to do this if shard splitting is in progress
|
||||
// (https://github.com/neondatabase/neon/issues/6676)
|
||||
let targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut targets = Vec::new();
|
||||
@@ -1416,326 +1352,6 @@ impl Service {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_shard_split(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
split_req: TenantShardSplitRequest,
|
||||
) -> Result<TenantShardSplitResponse, ApiError> {
|
||||
let mut policy = None;
|
||||
let mut shard_ident = None;
|
||||
|
||||
// TODO: put a cancellation token on Service for clean shutdown
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
// A parent shard which will be split
|
||||
struct SplitTarget {
|
||||
parent_id: TenantShardId,
|
||||
node: Node,
|
||||
child_ids: Vec<TenantShardId>,
|
||||
}
|
||||
|
||||
// Validate input, and calculate which shards we will create
|
||||
let (old_shard_count, targets, compute_hook) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
|
||||
let pageservers = locked.nodes.clone();
|
||||
|
||||
let mut targets = Vec::new();
|
||||
|
||||
// In case this is a retry, count how many already-split shards we found
|
||||
let mut children_found = Vec::new();
|
||||
let mut old_shard_count = None;
|
||||
|
||||
for (tenant_shard_id, shard) in
|
||||
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
match shard.shard.count.0.cmp(&split_req.new_shard_count) {
|
||||
Ordering::Equal => {
|
||||
// Already split this
|
||||
children_found.push(*tenant_shard_id);
|
||||
continue;
|
||||
}
|
||||
Ordering::Greater => {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Requested count {} but already have shards at count {}",
|
||||
split_req.new_shard_count,
|
||||
shard.shard.count.0
|
||||
)));
|
||||
}
|
||||
Ordering::Less => {
|
||||
// Fall through: this shard has lower count than requested,
|
||||
// is a candidate for splitting.
|
||||
}
|
||||
}
|
||||
|
||||
match old_shard_count {
|
||||
None => old_shard_count = Some(shard.shard.count),
|
||||
Some(old_shard_count) => {
|
||||
if old_shard_count != shard.shard.count {
|
||||
// We may hit this case if a caller asked for two splits to
|
||||
// different sizes, before the first one is complete.
|
||||
// e.g. 1->2, 2->4, where the 4 call comes while we have a mixture
|
||||
// of shard_count=1 and shard_count=2 shards in the map.
|
||||
return Err(ApiError::Conflict(
|
||||
"Cannot split, currently mid-split".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
if policy.is_none() {
|
||||
policy = Some(shard.policy.clone());
|
||||
}
|
||||
if shard_ident.is_none() {
|
||||
shard_ident = Some(shard.shard);
|
||||
}
|
||||
|
||||
if tenant_shard_id.shard_count == ShardCount(split_req.new_shard_count) {
|
||||
tracing::info!(
|
||||
"Tenant shard {} already has shard count {}",
|
||||
tenant_shard_id,
|
||||
split_req.new_shard_count
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
let node_id =
|
||||
shard
|
||||
.intent
|
||||
.attached
|
||||
.ok_or(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Cannot split a tenant that is not attached"
|
||||
)))?;
|
||||
|
||||
let node = pageservers
|
||||
.get(&node_id)
|
||||
.expect("Pageservers may not be deleted while referenced");
|
||||
|
||||
// TODO: if any reconciliation is currently in progress for this shard, wait for it.
|
||||
|
||||
targets.push(SplitTarget {
|
||||
parent_id: *tenant_shard_id,
|
||||
node: node.clone(),
|
||||
child_ids: tenant_shard_id.split(ShardCount(split_req.new_shard_count)),
|
||||
});
|
||||
}
|
||||
|
||||
if targets.is_empty() {
|
||||
if children_found.len() == split_req.new_shard_count as usize {
|
||||
return Ok(TenantShardSplitResponse {
|
||||
new_shards: children_found,
|
||||
});
|
||||
} else {
|
||||
// No shards found to split, and no existing children found: the
|
||||
// tenant doesn't exist at all.
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant {} not found", tenant_id).into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
(old_shard_count, targets, locked.compute_hook.clone())
|
||||
};
|
||||
|
||||
// unwrap safety: we would have returned above if we didn't find at least one shard to split
|
||||
let old_shard_count = old_shard_count.unwrap();
|
||||
let shard_ident = shard_ident.unwrap();
|
||||
let policy = policy.unwrap();
|
||||
|
||||
// FIXME: we have dropped self.inner lock, and not yet written anything to the database: another
|
||||
// request could occur here, deleting or mutating the tenant. begin_shard_split checks that the
|
||||
// parent shards exist as expected, but it would be neater to do the above pre-checks within the
|
||||
// same database transaction rather than pre-check in-memory and then maybe-fail the database write.
|
||||
// (https://github.com/neondatabase/neon/issues/6676)
|
||||
|
||||
// Before creating any new child shards in memory or on the pageservers, persist them: this
|
||||
// enables us to ensure that we will always be able to clean up if something goes wrong. This also
|
||||
// acts as the protection against two concurrent attempts to split: one of them will get a database
|
||||
// error trying to insert the child shards.
|
||||
let mut child_tsps = Vec::new();
|
||||
for target in &targets {
|
||||
let mut this_child_tsps = Vec::new();
|
||||
for child in &target.child_ids {
|
||||
let mut child_shard = shard_ident;
|
||||
child_shard.number = child.shard_number;
|
||||
child_shard.count = child.shard_count;
|
||||
|
||||
this_child_tsps.push(TenantShardPersistence {
|
||||
tenant_id: child.tenant_id.to_string(),
|
||||
shard_number: child.shard_number.0 as i32,
|
||||
shard_count: child.shard_count.0 as i32,
|
||||
shard_stripe_size: shard_ident.stripe_size.0 as i32,
|
||||
// Note: this generation is a placeholder, [`Persistence::begin_shard_split`] will
|
||||
// populate the correct generation as part of its transaction, to protect us
|
||||
// against racing with changes in the state of the parent.
|
||||
generation: 0,
|
||||
generation_pageserver: target.node.id.0 as i64,
|
||||
placement_policy: serde_json::to_string(&policy).unwrap(),
|
||||
// TODO: get the config out of the map
|
||||
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
|
||||
splitting: SplitState::Splitting,
|
||||
});
|
||||
}
|
||||
|
||||
child_tsps.push((target.parent_id, this_child_tsps));
|
||||
}
|
||||
|
||||
if let Err(e) = self
|
||||
.persistence
|
||||
.begin_shard_split(old_shard_count, tenant_id, child_tsps)
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
DatabaseError::Query(diesel::result::Error::DatabaseError(
|
||||
DatabaseErrorKind::UniqueViolation,
|
||||
_,
|
||||
)) => {
|
||||
// Inserting a child shard violated a unique constraint: we raced with another call to
|
||||
// this function
|
||||
tracing::warn!("Conflicting attempt to split {tenant_id}: {e}");
|
||||
return Err(ApiError::Conflict("Tenant is already splitting".into()));
|
||||
}
|
||||
_ => return Err(ApiError::InternalServerError(e.into())),
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: we have now committed the shard split state to the database, so any subsequent
|
||||
// failure needs to roll it back. We will later wrap this function in logic to roll back
|
||||
// the split if it fails.
|
||||
// (https://github.com/neondatabase/neon/issues/6676)
|
||||
|
||||
// TODO: issue split calls concurrently (this only matters once we're splitting
|
||||
// N>1 shards into M shards -- initially we're usually splitting 1 shard into N).
|
||||
|
||||
for target in &targets {
|
||||
let SplitTarget {
|
||||
parent_id,
|
||||
node,
|
||||
child_ids,
|
||||
} = target;
|
||||
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
|
||||
let response = client
|
||||
.tenant_shard_split(
|
||||
*parent_id,
|
||||
TenantShardSplitRequest {
|
||||
new_shard_count: split_req.new_shard_count,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(|e| ApiError::Conflict(format!("Failed to split {}: {}", parent_id, e)))?;
|
||||
|
||||
tracing::info!(
|
||||
"Split {} into {}",
|
||||
parent_id,
|
||||
response
|
||||
.new_shards
|
||||
.iter()
|
||||
.map(|s| format!("{:?}", s))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
);
|
||||
|
||||
if &response.new_shards != child_ids {
|
||||
// This should never happen: the pageserver should agree with us on how shard splits work.
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Splitting shard {} resulted in unexpected IDs: {:?} (expected {:?})",
|
||||
parent_id,
|
||||
response.new_shards,
|
||||
child_ids
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: if the pageserver restarted concurrently with our split API call,
|
||||
// the actual generation of the child shard might differ from the generation
|
||||
// we expect it to have. In order for our in-database generation to end up
|
||||
// correct, we should carry the child generation back in the response and apply it here
|
||||
// in complete_shard_split (and apply the correct generation in memory)
|
||||
// (or, we can carry generation in the request and reject the request if
|
||||
// it doesn't match, but that requires more retry logic on this side)
|
||||
|
||||
self.persistence
|
||||
.complete_shard_split(tenant_id, old_shard_count)
|
||||
.await?;
|
||||
|
||||
// Replace all the shards we just split with their children
|
||||
let mut response = TenantShardSplitResponse {
|
||||
new_shards: Vec::new(),
|
||||
};
|
||||
let mut child_locations = Vec::new();
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
for target in targets {
|
||||
let SplitTarget {
|
||||
parent_id,
|
||||
node: _node,
|
||||
child_ids,
|
||||
} = target;
|
||||
let (pageserver, generation, config) = {
|
||||
let old_state = locked
|
||||
.tenants
|
||||
.remove(&parent_id)
|
||||
.expect("It was present, we just split it");
|
||||
(
|
||||
old_state.intent.attached.unwrap(),
|
||||
old_state.generation,
|
||||
old_state.config.clone(),
|
||||
)
|
||||
};
|
||||
|
||||
locked.tenants.remove(&parent_id);
|
||||
|
||||
for child in child_ids {
|
||||
let mut child_shard = shard_ident;
|
||||
child_shard.number = child.shard_number;
|
||||
child_shard.count = child.shard_count;
|
||||
|
||||
let mut child_observed: HashMap<NodeId, ObservedStateLocation> = HashMap::new();
|
||||
child_observed.insert(
|
||||
pageserver,
|
||||
ObservedStateLocation {
|
||||
conf: Some(attached_location_conf(generation, &child_shard, &config)),
|
||||
},
|
||||
);
|
||||
|
||||
let mut child_state = TenantState::new(child, child_shard, policy.clone());
|
||||
child_state.intent = IntentState::single(Some(pageserver));
|
||||
child_state.observed = ObservedState {
|
||||
locations: child_observed,
|
||||
};
|
||||
child_state.generation = generation;
|
||||
child_state.config = config.clone();
|
||||
|
||||
child_locations.push((child, pageserver));
|
||||
|
||||
locked.tenants.insert(child, child_state);
|
||||
response.new_shards.push(child);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send compute notifications for all the new shards
|
||||
let mut failed_notifications = Vec::new();
|
||||
for (child_id, child_ps) in child_locations {
|
||||
if let Err(e) = compute_hook.notify(child_id, child_ps, &cancel).await {
|
||||
tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})",
|
||||
child_id, child_ps);
|
||||
failed_notifications.push(child_id);
|
||||
}
|
||||
}
|
||||
|
||||
// If we failed any compute notifications, make a note to retry later.
|
||||
if !failed_notifications.is_empty() {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
for failed in failed_notifications {
|
||||
if let Some(shard) = locked.tenants.get_mut(&failed) {
|
||||
shard.pending_compute_notification = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_shard_migrate(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
|
||||
@@ -193,13 +193,6 @@ impl IntentState {
|
||||
result
|
||||
}
|
||||
|
||||
pub(crate) fn single(node_id: Option<NodeId>) -> Self {
|
||||
Self {
|
||||
attached: node_id,
|
||||
secondary: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
/// When a node goes offline, we update intents to avoid using it
|
||||
/// as their attached pageserver.
|
||||
///
|
||||
@@ -293,9 +286,6 @@ impl TenantState {
|
||||
// self.intent refers to pageservers that are offline, and pick other
|
||||
// pageservers if so.
|
||||
|
||||
// TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
|
||||
// change their attach location.
|
||||
|
||||
// Build the set of pageservers already in use by this tenant, to avoid scheduling
|
||||
// more work on the same pageservers we're already using.
|
||||
let mut used_pageservers = self.intent.all_pageservers();
|
||||
|
||||
@@ -8,10 +8,7 @@ use diesel::{
|
||||
use diesel_migrations::{HarnessWithOutput, MigrationHarness};
|
||||
use hyper::Method;
|
||||
use pageserver_api::{
|
||||
models::{
|
||||
ShardParameters, TenantCreateRequest, TenantShardSplitRequest, TenantShardSplitResponse,
|
||||
TimelineCreateRequest, TimelineInfo,
|
||||
},
|
||||
models::{ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo},
|
||||
shard::TenantShardId,
|
||||
};
|
||||
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
|
||||
@@ -651,7 +648,7 @@ impl AttachmentService {
|
||||
) -> anyhow::Result<TenantShardMigrateResponse> {
|
||||
self.dispatch(
|
||||
Method::PUT,
|
||||
format!("control/v1/tenant/{tenant_shard_id}/migrate"),
|
||||
format!("tenant/{tenant_shard_id}/migrate"),
|
||||
Some(TenantShardMigrateRequest {
|
||||
tenant_shard_id,
|
||||
node_id,
|
||||
@@ -660,20 +657,6 @@ impl AttachmentService {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip(self), fields(%tenant_id, %new_shard_count))]
|
||||
pub async fn tenant_split(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
new_shard_count: u8,
|
||||
) -> anyhow::Result<TenantShardSplitResponse> {
|
||||
self.dispatch(
|
||||
Method::PUT,
|
||||
format!("control/v1/tenant/{tenant_id}/shard_split"),
|
||||
Some(TenantShardSplitRequest { new_shard_count }),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(node_id=%req.node_id))]
|
||||
pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> {
|
||||
self.dispatch::<_, ()>(Method::POST, "control/v1/node".to_string(), Some(req))
|
||||
|
||||
@@ -72,6 +72,7 @@ where
|
||||
let log_path = datadir.join(format!("{process_name}.log"));
|
||||
let process_log_file = fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.append(true)
|
||||
.open(&log_path)
|
||||
.with_context(|| {
|
||||
|
||||
@@ -575,26 +575,6 @@ async fn handle_tenant(
|
||||
println!("{tenant_table}");
|
||||
println!("{shard_table}");
|
||||
}
|
||||
Some(("shard-split", matches)) => {
|
||||
let tenant_id = get_tenant_id(matches, env)?;
|
||||
let shard_count: u8 = matches.get_one::<u8>("shard-count").cloned().unwrap_or(0);
|
||||
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
let result = attachment_service
|
||||
.tenant_split(tenant_id, shard_count)
|
||||
.await?;
|
||||
println!(
|
||||
"Split tenant {} into shards {}",
|
||||
tenant_id,
|
||||
result
|
||||
.new_shards
|
||||
.iter()
|
||||
.map(|s| format!("{:?}", s))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
);
|
||||
}
|
||||
|
||||
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name),
|
||||
None => bail!("no tenant subcommand provided"),
|
||||
}
|
||||
@@ -1544,11 +1524,6 @@ fn cli() -> Command {
|
||||
.subcommand(Command::new("status")
|
||||
.about("Human readable summary of the tenant's shards and attachment locations")
|
||||
.arg(tenant_id_arg.clone()))
|
||||
.subcommand(Command::new("shard-split")
|
||||
.about("Increase the number of shards in the tenant")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)"))
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("pageserver")
|
||||
|
||||
@@ -90,6 +90,11 @@ pub enum ComputeFeature {
|
||||
/// track short-lived connections as user activity.
|
||||
ActivityMonitorExperimental,
|
||||
|
||||
// Use latest version of remote extensions
|
||||
// This is needed to allow us to test new versions of extensions before
|
||||
// they are merged into the main branch.
|
||||
RemoteExtensionsUseLatest,
|
||||
|
||||
/// This is a special feature flag that is used to represent unknown feature flags.
|
||||
/// Basically all unknown to enum flags are represented as this one. See unit test
|
||||
/// `parse_unknown_features()` for more details.
|
||||
@@ -152,8 +157,12 @@ impl RemoteExtSpec {
|
||||
//
|
||||
// Keep it in sync with path generation in
|
||||
// https://github.com/neondatabase/build-custom-extensions/tree/main
|
||||
//
|
||||
// if ComputeFeature::RemoteExtensionsUseLatest is enabled
|
||||
// use "latest" as the build_tag
|
||||
let archive_path_str =
|
||||
format!("{build_tag}/{pg_major_version}/extensions/{real_ext_name}.tar.zst");
|
||||
|
||||
Ok((
|
||||
real_ext_name.to_string(),
|
||||
RemotePath::from_string(&archive_path_str)?,
|
||||
|
||||
@@ -192,16 +192,6 @@ pub struct TimelineCreateRequest {
|
||||
pub pg_version: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TenantShardSplitRequest {
|
||||
pub new_shard_count: u8,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TenantShardSplitResponse {
|
||||
pub new_shards: Vec<TenantShardId>,
|
||||
}
|
||||
|
||||
/// Parameters that apply to all shards in a tenant. Used during tenant creation.
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
|
||||
@@ -88,36 +88,12 @@ impl TenantShardId {
|
||||
pub fn is_unsharded(&self) -> bool {
|
||||
self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
|
||||
}
|
||||
|
||||
/// Convenience for dropping the tenant_id and just getting the ShardIndex: this
|
||||
/// is useful when logging from code that is already in a span that includes tenant ID, to
|
||||
/// keep messages reasonably terse.
|
||||
pub fn to_index(&self) -> ShardIndex {
|
||||
ShardIndex {
|
||||
shard_number: self.shard_number,
|
||||
shard_count: self.shard_count,
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculate the children of this TenantShardId when splitting the overall tenant into
|
||||
/// the given number of shards.
|
||||
pub fn split(&self, new_shard_count: ShardCount) -> Vec<TenantShardId> {
|
||||
let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1);
|
||||
let mut child_shards = Vec::new();
|
||||
for shard_number in 0..ShardNumber(new_shard_count.0).0 {
|
||||
// Key mapping is based on a round robin mapping of key hash modulo shard count,
|
||||
// so our child shards are the ones which the same keys would map to.
|
||||
if shard_number % effective_old_shard_count == self.shard_number.0 {
|
||||
child_shards.push(TenantShardId {
|
||||
tenant_id: self.tenant_id,
|
||||
shard_number: ShardNumber(shard_number),
|
||||
shard_count: new_shard_count,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
child_shards
|
||||
}
|
||||
}
|
||||
|
||||
/// Formatting helper
|
||||
@@ -817,108 +793,4 @@ mod tests {
|
||||
let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
|
||||
assert_eq!(shard, ShardNumber(8));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shard_id_split() {
|
||||
let tenant_id = TenantId::generate();
|
||||
let parent = TenantShardId::unsharded(tenant_id);
|
||||
|
||||
// Unsharded into 2
|
||||
assert_eq!(
|
||||
parent.split(ShardCount(2)),
|
||||
vec![
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: ShardCount(2),
|
||||
shard_number: ShardNumber(0)
|
||||
},
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: ShardCount(2),
|
||||
shard_number: ShardNumber(1)
|
||||
}
|
||||
]
|
||||
);
|
||||
|
||||
// Unsharded into 4
|
||||
assert_eq!(
|
||||
parent.split(ShardCount(4)),
|
||||
vec![
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: ShardCount(4),
|
||||
shard_number: ShardNumber(0)
|
||||
},
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: ShardCount(4),
|
||||
shard_number: ShardNumber(1)
|
||||
},
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: ShardCount(4),
|
||||
shard_number: ShardNumber(2)
|
||||
},
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: ShardCount(4),
|
||||
shard_number: ShardNumber(3)
|
||||
}
|
||||
]
|
||||
);
|
||||
|
||||
// count=1 into 2 (check this works the same as unsharded.)
|
||||
let parent = TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: ShardCount(1),
|
||||
shard_number: ShardNumber(0),
|
||||
};
|
||||
assert_eq!(
|
||||
parent.split(ShardCount(2)),
|
||||
vec![
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: ShardCount(2),
|
||||
shard_number: ShardNumber(0)
|
||||
},
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: ShardCount(2),
|
||||
shard_number: ShardNumber(1)
|
||||
}
|
||||
]
|
||||
);
|
||||
|
||||
// count=2 into count=8
|
||||
let parent = TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: ShardCount(2),
|
||||
shard_number: ShardNumber(1),
|
||||
};
|
||||
assert_eq!(
|
||||
parent.split(ShardCount(8)),
|
||||
vec![
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: ShardCount(8),
|
||||
shard_number: ShardNumber(1)
|
||||
},
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: ShardCount(8),
|
||||
shard_number: ShardNumber(3)
|
||||
},
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: ShardCount(8),
|
||||
shard_number: ShardNumber(5)
|
||||
},
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: ShardCount(8),
|
||||
shard_number: ShardNumber(7)
|
||||
},
|
||||
]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,11 +27,6 @@ impl Barrier {
|
||||
b.wait().await
|
||||
}
|
||||
}
|
||||
|
||||
/// Return true if a call to wait() would complete immediately
|
||||
pub fn is_ready(&self) -> bool {
|
||||
futures::future::FutureExt::now_or_never(self.0.wait()).is_some()
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for Barrier {
|
||||
|
||||
@@ -69,44 +69,37 @@ impl<T> OnceCell<T> {
|
||||
F: FnOnce(InitPermit) -> Fut,
|
||||
Fut: std::future::Future<Output = Result<(T, InitPermit), E>>,
|
||||
{
|
||||
loop {
|
||||
let sem = {
|
||||
let guard = self.inner.write().await;
|
||||
if guard.value.is_some() {
|
||||
return Ok(GuardMut(guard));
|
||||
}
|
||||
guard.init_semaphore.clone()
|
||||
};
|
||||
|
||||
{
|
||||
let permit = {
|
||||
// increment the count for the duration of queued
|
||||
let _guard = CountWaitingInitializers::start(self);
|
||||
sem.acquire().await
|
||||
};
|
||||
|
||||
let Ok(permit) = permit else {
|
||||
let guard = self.inner.write().await;
|
||||
if !Arc::ptr_eq(&sem, &guard.init_semaphore) {
|
||||
// there was a take_and_deinit in between
|
||||
continue;
|
||||
}
|
||||
assert!(
|
||||
guard.value.is_some(),
|
||||
"semaphore got closed, must be initialized"
|
||||
);
|
||||
return Ok(GuardMut(guard));
|
||||
};
|
||||
|
||||
permit.forget();
|
||||
}
|
||||
|
||||
let permit = InitPermit(sem);
|
||||
let (value, _permit) = factory(permit).await?;
|
||||
|
||||
let sem = {
|
||||
let guard = self.inner.write().await;
|
||||
if guard.value.is_some() {
|
||||
return Ok(GuardMut(guard));
|
||||
}
|
||||
guard.init_semaphore.clone()
|
||||
};
|
||||
|
||||
return Ok(Self::set0(value, guard));
|
||||
let permit = {
|
||||
// increment the count for the duration of queued
|
||||
let _guard = CountWaitingInitializers::start(self);
|
||||
sem.acquire_owned().await
|
||||
};
|
||||
|
||||
match permit {
|
||||
Ok(permit) => {
|
||||
let permit = InitPermit(permit);
|
||||
let (value, _permit) = factory(permit).await?;
|
||||
|
||||
let guard = self.inner.write().await;
|
||||
|
||||
Ok(Self::set0(value, guard))
|
||||
}
|
||||
Err(_closed) => {
|
||||
let guard = self.inner.write().await;
|
||||
assert!(
|
||||
guard.value.is_some(),
|
||||
"semaphore got closed, must be initialized"
|
||||
);
|
||||
return Ok(GuardMut(guard));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -119,44 +112,37 @@ impl<T> OnceCell<T> {
|
||||
F: FnOnce(InitPermit) -> Fut,
|
||||
Fut: std::future::Future<Output = Result<(T, InitPermit), E>>,
|
||||
{
|
||||
loop {
|
||||
let sem = {
|
||||
let guard = self.inner.read().await;
|
||||
if guard.value.is_some() {
|
||||
return Ok(GuardRef(guard));
|
||||
}
|
||||
guard.init_semaphore.clone()
|
||||
};
|
||||
|
||||
{
|
||||
let permit = {
|
||||
// increment the count for the duration of queued
|
||||
let _guard = CountWaitingInitializers::start(self);
|
||||
sem.acquire().await
|
||||
};
|
||||
|
||||
let Ok(permit) = permit else {
|
||||
let guard = self.inner.read().await;
|
||||
if !Arc::ptr_eq(&sem, &guard.init_semaphore) {
|
||||
// there was a take_and_deinit in between
|
||||
continue;
|
||||
}
|
||||
assert!(
|
||||
guard.value.is_some(),
|
||||
"semaphore got closed, must be initialized"
|
||||
);
|
||||
return Ok(GuardRef(guard));
|
||||
};
|
||||
|
||||
permit.forget();
|
||||
let sem = {
|
||||
let guard = self.inner.read().await;
|
||||
if guard.value.is_some() {
|
||||
return Ok(GuardRef(guard));
|
||||
}
|
||||
guard.init_semaphore.clone()
|
||||
};
|
||||
|
||||
let permit = InitPermit(sem);
|
||||
let (value, _permit) = factory(permit).await?;
|
||||
let permit = {
|
||||
// increment the count for the duration of queued
|
||||
let _guard = CountWaitingInitializers::start(self);
|
||||
sem.acquire_owned().await
|
||||
};
|
||||
|
||||
let guard = self.inner.write().await;
|
||||
match permit {
|
||||
Ok(permit) => {
|
||||
let permit = InitPermit(permit);
|
||||
let (value, _permit) = factory(permit).await?;
|
||||
|
||||
return Ok(Self::set0(value, guard).downgrade());
|
||||
let guard = self.inner.write().await;
|
||||
|
||||
Ok(Self::set0(value, guard).downgrade())
|
||||
}
|
||||
Err(_closed) => {
|
||||
let guard = self.inner.read().await;
|
||||
assert!(
|
||||
guard.value.is_some(),
|
||||
"semaphore got closed, must be initialized"
|
||||
);
|
||||
return Ok(GuardRef(guard));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -264,12 +250,15 @@ impl<'a, T> GuardMut<'a, T> {
|
||||
/// [`OnceCell::get_or_init`] will wait on it to complete.
|
||||
pub fn take_and_deinit(&mut self) -> (T, InitPermit) {
|
||||
let mut swapped = Inner::default();
|
||||
let sem = swapped.init_semaphore.clone();
|
||||
sem.try_acquire().expect("we just created this").forget();
|
||||
let permit = swapped
|
||||
.init_semaphore
|
||||
.clone()
|
||||
.try_acquire_owned()
|
||||
.expect("we just created this");
|
||||
std::mem::swap(&mut *self.0, &mut swapped);
|
||||
swapped
|
||||
.value
|
||||
.map(|v| (v, InitPermit(sem)))
|
||||
.map(|v| (v, InitPermit(permit)))
|
||||
.expect("guard is not created unless value has been initialized")
|
||||
}
|
||||
|
||||
@@ -293,23 +282,13 @@ impl<T> std::ops::Deref for GuardRef<'_, T> {
|
||||
}
|
||||
|
||||
/// Type held by OnceCell (de)initializing task.
|
||||
pub struct InitPermit(Arc<tokio::sync::Semaphore>);
|
||||
|
||||
impl Drop for InitPermit {
|
||||
fn drop(&mut self) {
|
||||
debug_assert_eq!(self.0.available_permits(), 0);
|
||||
self.0.add_permits(1);
|
||||
}
|
||||
}
|
||||
pub struct InitPermit(tokio::sync::OwnedSemaphorePermit);
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::Future;
|
||||
|
||||
use super::*;
|
||||
use std::{
|
||||
convert::Infallible,
|
||||
pin::{pin, Pin},
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
time::Duration,
|
||||
};
|
||||
@@ -476,94 +455,4 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(*g, "now initialized");
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn reproduce_init_take_deinit_race() {
|
||||
init_take_deinit_scenario(|cell, factory| {
|
||||
Box::pin(async {
|
||||
cell.get_or_init(factory).await.unwrap();
|
||||
})
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn reproduce_init_take_deinit_race_mut() {
|
||||
init_take_deinit_scenario(|cell, factory| {
|
||||
Box::pin(async {
|
||||
cell.get_mut_or_init(factory).await.unwrap();
|
||||
})
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
type BoxedInitFuture<T, E> = Pin<Box<dyn Future<Output = Result<(T, InitPermit), E>>>>;
|
||||
type BoxedInitFunction<T, E> = Box<dyn Fn(InitPermit) -> BoxedInitFuture<T, E>>;
|
||||
|
||||
/// Reproduce an assertion failure with both initialization methods.
|
||||
///
|
||||
/// This has interesting generics to be generic between `get_or_init` and `get_mut_or_init`.
|
||||
/// Alternative would be a macro_rules! but that is the last resort.
|
||||
async fn init_take_deinit_scenario<F>(init_way: F)
|
||||
where
|
||||
F: for<'a> Fn(
|
||||
&'a OnceCell<&'static str>,
|
||||
BoxedInitFunction<&'static str, Infallible>,
|
||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>>,
|
||||
{
|
||||
let cell = OnceCell::default();
|
||||
|
||||
// acquire the init_semaphore only permit to drive initializing tasks in order to waiting
|
||||
// on the same semaphore.
|
||||
let permit = cell
|
||||
.inner
|
||||
.read()
|
||||
.await
|
||||
.init_semaphore
|
||||
.clone()
|
||||
.try_acquire_owned()
|
||||
.unwrap();
|
||||
|
||||
let mut t1 = pin!(init_way(
|
||||
&cell,
|
||||
Box::new(|permit| Box::pin(async move { Ok(("t1", permit)) })),
|
||||
));
|
||||
|
||||
let mut t2 = pin!(init_way(
|
||||
&cell,
|
||||
Box::new(|permit| Box::pin(async move { Ok(("t2", permit)) })),
|
||||
));
|
||||
|
||||
// drive t2 first to the init_semaphore
|
||||
tokio::select! {
|
||||
_ = &mut t2 => unreachable!("it cannot get permit"),
|
||||
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
|
||||
}
|
||||
|
||||
// followed by t1 in the init_semaphore
|
||||
tokio::select! {
|
||||
_ = &mut t1 => unreachable!("it cannot get permit"),
|
||||
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
|
||||
}
|
||||
|
||||
// now let t2 proceed and initialize
|
||||
drop(permit);
|
||||
t2.await;
|
||||
|
||||
let (s, permit) = { cell.get_mut().await.unwrap().take_and_deinit() };
|
||||
assert_eq!("t2", s);
|
||||
|
||||
// now originally t1 would see the semaphore it has as closed. it cannot yet get a permit from
|
||||
// the new one.
|
||||
tokio::select! {
|
||||
_ = &mut t1 => unreachable!("it cannot get permit"),
|
||||
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
|
||||
}
|
||||
|
||||
// only now we get to initialize it
|
||||
drop(permit);
|
||||
t1.await;
|
||||
|
||||
assert_eq!("t1", *cell.get().await.unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,18 +56,10 @@ pub enum ForceAwaitLogicalSize {
|
||||
|
||||
impl Client {
|
||||
pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
|
||||
Self::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt)
|
||||
}
|
||||
|
||||
pub fn from_client(
|
||||
client: reqwest::Client,
|
||||
mgmt_api_endpoint: String,
|
||||
jwt: Option<&str>,
|
||||
) -> Self {
|
||||
Self {
|
||||
mgmt_api_endpoint,
|
||||
authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
|
||||
client,
|
||||
client: reqwest::Client::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -318,22 +310,6 @@ impl Client {
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn tenant_shard_split(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
req: TenantShardSplitRequest,
|
||||
) -> Result<TenantShardSplitResponse> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{}/shard_split",
|
||||
self.mgmt_api_endpoint, tenant_shard_id
|
||||
);
|
||||
self.request(Method::PUT, &uri, req)
|
||||
.await?
|
||||
.json()
|
||||
.await
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn timeline_list(
|
||||
&self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
|
||||
@@ -274,10 +274,6 @@ fn start_pageserver(
|
||||
set_launch_timestamp_metric(launch_ts);
|
||||
#[cfg(target_os = "linux")]
|
||||
metrics::register_internal(Box::new(metrics::more_process_metrics::Collector::new())).unwrap();
|
||||
metrics::register_internal(Box::new(
|
||||
pageserver::metrics::tokio_epoll_uring::Collector::new(),
|
||||
))
|
||||
.unwrap();
|
||||
pageserver::preinitialize_metrics();
|
||||
|
||||
// If any failpoints were set from FAILPOINTS environment variable,
|
||||
|
||||
@@ -623,7 +623,6 @@ impl std::fmt::Display for EvictionLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct DiskUsageEvictionInfo {
|
||||
/// Timeline's largest layer (remote or resident)
|
||||
pub max_layer_size: Option<u64>,
|
||||
@@ -855,27 +854,19 @@ async fn collect_eviction_candidates(
|
||||
|
||||
let total = tenant_candidates.len();
|
||||
|
||||
let tenant_candidates =
|
||||
tenant_candidates
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(i, mut candidate)| {
|
||||
// as we iterate this reverse sorted list, the most recently accessed layer will always
|
||||
// be 1.0; this is for us to evict it last.
|
||||
candidate.relative_last_activity =
|
||||
eviction_order.relative_last_activity(total, i);
|
||||
for (i, mut candidate) in tenant_candidates.into_iter().enumerate() {
|
||||
// as we iterate this reverse sorted list, the most recently accessed layer will always
|
||||
// be 1.0; this is for us to evict it last.
|
||||
candidate.relative_last_activity = eviction_order.relative_last_activity(total, i);
|
||||
|
||||
let partition = if cumsum > min_resident_size as i128 {
|
||||
MinResidentSizePartition::Above
|
||||
} else {
|
||||
MinResidentSizePartition::Below
|
||||
};
|
||||
cumsum += i128::from(candidate.layer.get_file_size());
|
||||
|
||||
(partition, candidate)
|
||||
});
|
||||
|
||||
candidates.extend(tenant_candidates);
|
||||
let partition = if cumsum > min_resident_size as i128 {
|
||||
MinResidentSizePartition::Above
|
||||
} else {
|
||||
MinResidentSizePartition::Below
|
||||
};
|
||||
cumsum += i128::from(candidate.layer.get_file_size());
|
||||
candidates.push((partition, candidate));
|
||||
}
|
||||
}
|
||||
|
||||
// Note: the same tenant ID might be hit twice, if it transitions from attached to
|
||||
@@ -891,41 +882,21 @@ async fn collect_eviction_candidates(
|
||||
);
|
||||
|
||||
for secondary_tenant in secondary_tenants {
|
||||
// for secondary tenants we use a sum of on_disk layers and already evicted layers. this is
|
||||
// to prevent repeated disk usage based evictions from completely draining less often
|
||||
// updating secondaries.
|
||||
let (mut layer_info, total_layers) = secondary_tenant.get_layers_for_eviction();
|
||||
|
||||
debug_assert!(
|
||||
total_layers >= layer_info.resident_layers.len(),
|
||||
"total_layers ({total_layers}) must be at least the resident_layers.len() ({})",
|
||||
layer_info.resident_layers.len()
|
||||
);
|
||||
let mut layer_info = secondary_tenant.get_layers_for_eviction();
|
||||
|
||||
layer_info
|
||||
.resident_layers
|
||||
.sort_unstable_by_key(|layer_info| std::cmp::Reverse(layer_info.last_activity_ts));
|
||||
|
||||
let tenant_candidates =
|
||||
layer_info
|
||||
.resident_layers
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(i, mut candidate)| {
|
||||
candidate.relative_last_activity =
|
||||
eviction_order.relative_last_activity(total_layers, i);
|
||||
(
|
||||
// Secondary locations' layers are always considered above the min resident size,
|
||||
// i.e. secondary locations are permitted to be trimmed to zero layers if all
|
||||
// the layers have sufficiently old access times.
|
||||
MinResidentSizePartition::Above,
|
||||
candidate,
|
||||
)
|
||||
});
|
||||
|
||||
candidates.extend(tenant_candidates);
|
||||
|
||||
tokio::task::yield_now().await;
|
||||
candidates.extend(layer_info.resident_layers.into_iter().map(|candidate| {
|
||||
(
|
||||
// Secondary locations' layers are always considered above the min resident size,
|
||||
// i.e. secondary locations are permitted to be trimmed to zero layers if all
|
||||
// the layers have sufficiently old access times.
|
||||
MinResidentSizePartition::Above,
|
||||
candidate,
|
||||
)
|
||||
}));
|
||||
}
|
||||
|
||||
debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below,
|
||||
|
||||
@@ -19,14 +19,11 @@ use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::models::TenantDetails;
|
||||
use pageserver_api::models::TenantLocationConfigResponse;
|
||||
use pageserver_api::models::TenantShardLocation;
|
||||
use pageserver_api::models::TenantShardSplitRequest;
|
||||
use pageserver_api::models::TenantShardSplitResponse;
|
||||
use pageserver_api::models::TenantState;
|
||||
use pageserver_api::models::{
|
||||
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
|
||||
TenantLoadRequest, TenantLocationConfigRequest,
|
||||
};
|
||||
use pageserver_api::shard::ShardCount;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use remote_storage::TimeTravelError;
|
||||
@@ -878,7 +875,7 @@ async fn tenant_reset_handler(
|
||||
let state = get_state(&request);
|
||||
state
|
||||
.tenant_manager
|
||||
.reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), &ctx)
|
||||
.reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
@@ -1107,25 +1104,6 @@ async fn tenant_size_handler(
|
||||
)
|
||||
}
|
||||
|
||||
async fn tenant_shard_split_handler(
|
||||
mut request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let req: TenantShardSplitRequest = json_request(&mut request).await?;
|
||||
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let state = get_state(&request);
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
let new_shards = state
|
||||
.tenant_manager
|
||||
.shard_split(tenant_shard_id, ShardCount(req.new_shard_count), &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, TenantShardSplitResponse { new_shards })
|
||||
}
|
||||
|
||||
async fn layer_map_info_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -2085,9 +2063,6 @@ pub fn make_router(
|
||||
.put("/v1/tenant/config", |r| {
|
||||
api_handler(r, update_tenant_config_handler)
|
||||
})
|
||||
.put("/v1/tenant/:tenant_shard_id/shard_split", |r| {
|
||||
api_handler(r, tenant_shard_split_handler)
|
||||
})
|
||||
.get("/v1/tenant/:tenant_shard_id/config", |r| {
|
||||
api_handler(r, get_tenant_config_handler)
|
||||
})
|
||||
|
||||
@@ -2400,72 +2400,6 @@ impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
|
||||
}
|
||||
}
|
||||
|
||||
pub mod tokio_epoll_uring {
|
||||
use metrics::UIntGauge;
|
||||
|
||||
pub struct Collector {
|
||||
descs: Vec<metrics::core::Desc>,
|
||||
systems_created: UIntGauge,
|
||||
systems_destroyed: UIntGauge,
|
||||
}
|
||||
|
||||
const NMETRICS: usize = 2;
|
||||
|
||||
impl metrics::core::Collector for Collector {
|
||||
fn desc(&self) -> Vec<&metrics::core::Desc> {
|
||||
self.descs.iter().collect()
|
||||
}
|
||||
|
||||
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
|
||||
let mut mfs = Vec::with_capacity(NMETRICS);
|
||||
let tokio_epoll_uring::metrics::Metrics {
|
||||
systems_created,
|
||||
systems_destroyed,
|
||||
} = tokio_epoll_uring::metrics::global();
|
||||
self.systems_created.set(systems_created);
|
||||
mfs.extend(self.systems_created.collect());
|
||||
self.systems_destroyed.set(systems_destroyed);
|
||||
mfs.extend(self.systems_destroyed.collect());
|
||||
mfs
|
||||
}
|
||||
}
|
||||
|
||||
impl Collector {
|
||||
#[allow(clippy::new_without_default)]
|
||||
pub fn new() -> Self {
|
||||
let mut descs = Vec::new();
|
||||
|
||||
let systems_created = UIntGauge::new(
|
||||
"pageserver_tokio_epoll_uring_systems_created",
|
||||
"counter of tokio-epoll-uring systems that were created",
|
||||
)
|
||||
.unwrap();
|
||||
descs.extend(
|
||||
metrics::core::Collector::desc(&systems_created)
|
||||
.into_iter()
|
||||
.cloned(),
|
||||
);
|
||||
|
||||
let systems_destroyed = UIntGauge::new(
|
||||
"pageserver_tokio_epoll_uring_systems_destroyed",
|
||||
"counter of tokio-epoll-uring systems that were destroyed",
|
||||
)
|
||||
.unwrap();
|
||||
descs.extend(
|
||||
metrics::core::Collector::desc(&systems_destroyed)
|
||||
.into_iter()
|
||||
.cloned(),
|
||||
);
|
||||
|
||||
Self {
|
||||
descs,
|
||||
systems_created,
|
||||
systems_destroyed,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn preinitialize_metrics() {
|
||||
// Python tests need these and on some we do alerting.
|
||||
//
|
||||
|
||||
@@ -989,17 +989,6 @@ impl<'a> DatadirModification<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn create_rel_dir(&mut self, spcnode: Oid, dbnode: Oid) -> anyhow::Result<()> {
|
||||
let buf = RelDirectory::ser(&RelDirectory {
|
||||
rels: HashSet::new(),
|
||||
})?;
|
||||
self.put(
|
||||
rel_dir_to_key(spcnode, dbnode),
|
||||
Value::Image(Bytes::from(buf)),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Store a relmapper file (pg_filenode.map) in the repository
|
||||
pub async fn put_relmap_file(
|
||||
&mut self,
|
||||
@@ -1182,6 +1171,9 @@ impl<'a> DatadirModification<'a> {
|
||||
// Update relation size cache
|
||||
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
|
||||
|
||||
// Update relation size cache
|
||||
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
|
||||
|
||||
// Update logical database size.
|
||||
self.pending_nblocks -= old_size as i64 - nblocks as i64;
|
||||
}
|
||||
|
||||
@@ -53,7 +53,6 @@ use self::metadata::TimelineMetadata;
|
||||
use self::mgr::GetActiveTenantError;
|
||||
use self::mgr::GetTenantError;
|
||||
use self::mgr::TenantsMap;
|
||||
use self::remote_timeline_client::upload::upload_index_part;
|
||||
use self::remote_timeline_client::RemoteTimelineClient;
|
||||
use self::timeline::uninit::TimelineExclusionError;
|
||||
use self::timeline::uninit::TimelineUninitMark;
|
||||
@@ -2398,67 +2397,6 @@ impl Tenant {
|
||||
pub(crate) fn get_generation(&self) -> Generation {
|
||||
self.generation
|
||||
}
|
||||
|
||||
/// This function partially shuts down the tenant (it shuts down the Timelines) and is fallible,
|
||||
/// and can leave the tenant in a bad state if it fails. The caller is responsible for
|
||||
/// resetting this tenant to a valid state if we fail.
|
||||
pub(crate) async fn split_prepare(
|
||||
&self,
|
||||
child_shards: &Vec<TenantShardId>,
|
||||
) -> anyhow::Result<()> {
|
||||
let timelines = self.timelines.lock().unwrap().clone();
|
||||
for timeline in timelines.values() {
|
||||
let Some(tl_client) = &timeline.remote_client else {
|
||||
anyhow::bail!("Remote storage is mandatory");
|
||||
};
|
||||
|
||||
let Some(remote_storage) = &self.remote_storage else {
|
||||
anyhow::bail!("Remote storage is mandatory");
|
||||
};
|
||||
|
||||
// We do not block timeline creation/deletion during splits inside the pageserver: it is up to higher levels
|
||||
// to ensure that they do not start a split if currently in the process of doing these.
|
||||
|
||||
// Upload an index from the parent: this is partly to provide freshness for the
|
||||
// child tenants that will copy it, and partly for general ease-of-debugging: there will
|
||||
// always be a parent shard index in the same generation as we wrote the child shard index.
|
||||
tl_client.schedule_index_upload_for_file_changes()?;
|
||||
tl_client.wait_completion().await?;
|
||||
|
||||
// Shut down the timeline's remote client: this means that the indices we write
|
||||
// for child shards will not be invalidated by the parent shard deleting layers.
|
||||
tl_client.shutdown().await?;
|
||||
|
||||
// Download methods can still be used after shutdown, as they don't flow through the remote client's
|
||||
// queue. In principal the RemoteTimelineClient could provide this without downloading it, but this
|
||||
// operation is rare, so it's simpler to just download it (and robustly guarantees that the index
|
||||
// we use here really is the remotely persistent one).
|
||||
let result = tl_client
|
||||
.download_index_file(self.cancel.clone())
|
||||
.instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))
|
||||
.await?;
|
||||
let index_part = match result {
|
||||
MaybeDeletedIndexPart::Deleted(_) => {
|
||||
anyhow::bail!("Timeline deletion happened concurrently with split")
|
||||
}
|
||||
MaybeDeletedIndexPart::IndexPart(p) => p,
|
||||
};
|
||||
|
||||
for child_shard in child_shards {
|
||||
upload_index_part(
|
||||
remote_storage,
|
||||
child_shard,
|
||||
&timeline.timeline_id,
|
||||
self.generation,
|
||||
&index_part,
|
||||
&self.cancel,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
|
||||
@@ -3794,10 +3732,6 @@ impl Tenant {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn get_tenant_conf(&self) -> TenantConfOpt {
|
||||
self.tenant_conf.read().unwrap().tenant_conf
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_timeline_and_uninit_mark(
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
//! page server.
|
||||
|
||||
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId};
|
||||
@@ -23,7 +22,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use utils::{completion, crashsafe};
|
||||
use utils::crashsafe;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
@@ -645,6 +644,8 @@ pub(crate) async fn shutdown_all_tenants() {
|
||||
}
|
||||
|
||||
async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
|
||||
use utils::completion;
|
||||
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
// Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
|
||||
@@ -1199,7 +1200,7 @@ impl TenantManager {
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
drop_cache: bool,
|
||||
ctx: &RequestContext,
|
||||
ctx: RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
|
||||
let Some(old_slot) = slot_guard.get_old_value() else {
|
||||
@@ -1252,7 +1253,7 @@ impl TenantManager {
|
||||
None,
|
||||
self.tenants,
|
||||
SpawnMode::Normal,
|
||||
ctx,
|
||||
&ctx,
|
||||
)?;
|
||||
|
||||
slot_guard.upsert(TenantSlot::Attached(tenant))?;
|
||||
@@ -1374,164 +1375,6 @@ impl TenantManager {
|
||||
slot_guard.revert();
|
||||
result
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), new_shard_count=%new_shard_count.0))]
|
||||
pub(crate) async fn shard_split(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
new_shard_count: ShardCount,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<TenantShardId>> {
|
||||
let tenant = get_tenant(tenant_shard_id, true)?;
|
||||
|
||||
// Plan: identify what the new child shards will be
|
||||
let effective_old_shard_count = std::cmp::max(tenant_shard_id.shard_count.0, 1);
|
||||
if new_shard_count <= ShardCount(effective_old_shard_count) {
|
||||
anyhow::bail!("Requested shard count is not an increase");
|
||||
}
|
||||
let expansion_factor = new_shard_count.0 / effective_old_shard_count;
|
||||
if !expansion_factor.is_power_of_two() {
|
||||
anyhow::bail!("Requested split is not a power of two");
|
||||
}
|
||||
|
||||
let parent_shard_identity = tenant.shard_identity;
|
||||
let parent_tenant_conf = tenant.get_tenant_conf();
|
||||
let parent_generation = tenant.generation;
|
||||
|
||||
let child_shards = tenant_shard_id.split(new_shard_count);
|
||||
tracing::info!(
|
||||
"Shard {} splits into: {}",
|
||||
tenant_shard_id.to_index(),
|
||||
child_shards
|
||||
.iter()
|
||||
.map(|id| format!("{}", id.to_index()))
|
||||
.join(",")
|
||||
);
|
||||
|
||||
// Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
|
||||
if let Err(e) = tenant.split_prepare(&child_shards).await {
|
||||
// If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
|
||||
// have been left in a partially-shut-down state.
|
||||
tracing::warn!("Failed to prepare for split: {e}, reloading Tenant before returning");
|
||||
self.reset_tenant(tenant_shard_id, false, ctx).await?;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
self.resources.deletion_queue_client.flush_advisory();
|
||||
|
||||
// Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
|
||||
drop(tenant);
|
||||
let mut parent_slot_guard =
|
||||
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
|
||||
let parent = match parent_slot_guard.get_old_value() {
|
||||
Some(TenantSlot::Attached(t)) => t,
|
||||
Some(TenantSlot::Secondary(_)) => anyhow::bail!("Tenant location in secondary mode"),
|
||||
Some(TenantSlot::InProgress(_)) => {
|
||||
// tenant_map_acquire_slot never returns InProgress, if a slot was InProgress
|
||||
// it would return an error.
|
||||
unreachable!()
|
||||
}
|
||||
None => {
|
||||
// We don't actually need the parent shard to still be attached to do our work, but it's
|
||||
// a weird enough situation that the caller probably didn't want us to continue working
|
||||
// if they had detached the tenant they requested the split on.
|
||||
anyhow::bail!("Detached parent shard in the middle of split!")
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: hardlink layers from the parent into the child shard directories so that they don't immediately re-download
|
||||
// TODO: erase the dentries from the parent
|
||||
|
||||
// Take a snapshot of where the parent's WAL ingest had got to: we will wait for
|
||||
// child shards to reach this point.
|
||||
let mut target_lsns = HashMap::new();
|
||||
for timeline in parent.timelines.lock().unwrap().clone().values() {
|
||||
target_lsns.insert(timeline.timeline_id, timeline.get_last_record_lsn());
|
||||
}
|
||||
|
||||
// TODO: we should have the parent shard stop its WAL ingest here, it's a waste of resources
|
||||
// and could slow down the children trying to catch up.
|
||||
|
||||
// Phase 3: Spawn the child shards
|
||||
for child_shard in &child_shards {
|
||||
let mut child_shard_identity = parent_shard_identity;
|
||||
child_shard_identity.count = child_shard.shard_count;
|
||||
child_shard_identity.number = child_shard.shard_number;
|
||||
|
||||
let child_location_conf = LocationConf {
|
||||
mode: LocationMode::Attached(AttachedLocationConfig {
|
||||
generation: parent_generation,
|
||||
attach_mode: AttachmentMode::Single,
|
||||
}),
|
||||
shard: child_shard_identity,
|
||||
tenant_conf: parent_tenant_conf,
|
||||
};
|
||||
|
||||
self.upsert_location(
|
||||
*child_shard,
|
||||
child_location_conf,
|
||||
None,
|
||||
SpawnMode::Normal,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Phase 4: wait for child chards WAL ingest to catch up to target LSN
|
||||
for child_shard_id in &child_shards {
|
||||
let child_shard = {
|
||||
let locked = TENANTS.read().unwrap();
|
||||
let peek_slot =
|
||||
tenant_map_peek_slot(&locked, child_shard_id, TenantSlotPeekMode::Read)?;
|
||||
peek_slot.and_then(|s| s.get_attached()).cloned()
|
||||
};
|
||||
if let Some(t) = child_shard {
|
||||
let timelines = t.timelines.lock().unwrap().clone();
|
||||
for timeline in timelines.values() {
|
||||
let Some(target_lsn) = target_lsns.get(&timeline.timeline_id) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
"Waiting for child shard {}/{} to reach target lsn {}...",
|
||||
child_shard_id,
|
||||
timeline.timeline_id,
|
||||
target_lsn
|
||||
);
|
||||
if let Err(e) = timeline.wait_lsn(*target_lsn, ctx).await {
|
||||
// Failure here might mean shutdown, in any case this part is an optimization
|
||||
// and we shouldn't hold up the split operation.
|
||||
tracing::warn!(
|
||||
"Failed to wait for timeline {} to reach lsn {target_lsn}: {e}",
|
||||
timeline.timeline_id
|
||||
);
|
||||
} else {
|
||||
tracing::info!(
|
||||
"Child shard {}/{} reached target lsn {}",
|
||||
child_shard_id,
|
||||
timeline.timeline_id,
|
||||
target_lsn
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Phase 5: Shut down the parent shard.
|
||||
let (_guard, progress) = completion::channel();
|
||||
match parent.shutdown(progress, false).await {
|
||||
Ok(()) => {}
|
||||
Err(other) => {
|
||||
other.wait().await;
|
||||
}
|
||||
}
|
||||
parent_slot_guard.drop_old_value()?;
|
||||
|
||||
// Phase 6: Release the InProgress on the parent shard
|
||||
drop(parent_slot_guard);
|
||||
|
||||
Ok(child_shards)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -2366,6 +2209,8 @@ async fn remove_tenant_from_memory<V, F>(
|
||||
where
|
||||
F: std::future::Future<Output = anyhow::Result<V>>,
|
||||
{
|
||||
use utils::completion;
|
||||
|
||||
let mut slot_guard =
|
||||
tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ use super::index::LayerFileMetadata;
|
||||
use tracing::info;
|
||||
|
||||
/// Serializes and uploads the given index part data to the remote storage.
|
||||
pub(crate) async fn upload_index_part<'a>(
|
||||
pub(super) async fn upload_index_part<'a>(
|
||||
storage: &'a GenericRemoteStorage,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
|
||||
@@ -160,7 +160,7 @@ impl SecondaryTenant {
|
||||
&self.tenant_shard_id
|
||||
}
|
||||
|
||||
pub(crate) fn get_layers_for_eviction(self: &Arc<Self>) -> (DiskUsageEvictionInfo, usize) {
|
||||
pub(crate) fn get_layers_for_eviction(self: &Arc<Self>) -> DiskUsageEvictionInfo {
|
||||
self.detail.lock().unwrap().get_layers_for_eviction(self)
|
||||
}
|
||||
|
||||
|
||||
@@ -146,15 +146,14 @@ impl SecondaryDetail {
|
||||
}
|
||||
}
|
||||
|
||||
/// Additionally returns the total number of layers, used for more stable relative access time
|
||||
/// based eviction.
|
||||
pub(super) fn get_layers_for_eviction(
|
||||
&self,
|
||||
parent: &Arc<SecondaryTenant>,
|
||||
) -> (DiskUsageEvictionInfo, usize) {
|
||||
let mut result = DiskUsageEvictionInfo::default();
|
||||
let mut total_layers = 0;
|
||||
|
||||
) -> DiskUsageEvictionInfo {
|
||||
let mut result = DiskUsageEvictionInfo {
|
||||
max_layer_size: None,
|
||||
resident_layers: Vec::new(),
|
||||
};
|
||||
for (timeline_id, timeline_detail) in &self.timelines {
|
||||
result
|
||||
.resident_layers
|
||||
@@ -170,10 +169,6 @@ impl SecondaryDetail {
|
||||
relative_last_activity: finite_f32::FiniteF32::ZERO,
|
||||
}
|
||||
}));
|
||||
|
||||
// total might be missing currently downloading layers, but as a lower than actual
|
||||
// value it is good enough approximation.
|
||||
total_layers += timeline_detail.on_disk_layers.len() + timeline_detail.evicted_at.len();
|
||||
}
|
||||
result.max_layer_size = result
|
||||
.resident_layers
|
||||
@@ -188,7 +183,7 @@ impl SecondaryDetail {
|
||||
result.resident_layers.len()
|
||||
);
|
||||
|
||||
(result, total_layers)
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
@@ -317,7 +312,9 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
.tenant_manager
|
||||
.get_secondary_tenant_shard(*tenant_shard_id);
|
||||
let Some(tenant) = tenant else {
|
||||
return Err(anyhow::anyhow!("Not found or not in Secondary mode"));
|
||||
{
|
||||
return Err(anyhow::anyhow!("Not found or not in Secondary mode"));
|
||||
}
|
||||
};
|
||||
|
||||
Ok(PendingDownload {
|
||||
@@ -392,9 +389,9 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
}
|
||||
|
||||
CompleteDownload {
|
||||
secondary_state,
|
||||
completed_at: Instant::now(),
|
||||
}
|
||||
secondary_state,
|
||||
completed_at: Instant::now(),
|
||||
}
|
||||
}.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -156,10 +156,6 @@ impl WalIngest {
|
||||
}
|
||||
} else if pg_version == 15 {
|
||||
if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
|
||||
let createdb = XlCreateDatabaseFromWal::decode(&mut buf);
|
||||
modification
|
||||
.create_rel_dir(createdb.tablespace_id, createdb.db_id)
|
||||
.await?;
|
||||
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
|
||||
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
|
||||
// The XLOG record was renamed between v14 and v15,
|
||||
@@ -180,10 +176,6 @@ impl WalIngest {
|
||||
}
|
||||
} else if pg_version == 16 {
|
||||
if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
|
||||
let createdb = XlCreateDatabaseFromWal::decode(&mut buf);
|
||||
modification
|
||||
.create_rel_dir(createdb.tablespace_id, createdb.db_id)
|
||||
.await?;
|
||||
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
|
||||
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
|
||||
// The XLOG record was renamed between v14 and v15,
|
||||
|
||||
@@ -521,22 +521,6 @@ impl XlCreateDatabase {
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlCreateDatabaseFromWal {
|
||||
pub db_id: Oid,
|
||||
pub tablespace_id: Oid,
|
||||
}
|
||||
|
||||
impl XlCreateDatabaseFromWal {
|
||||
pub fn decode(buf: &mut Bytes) -> XlCreateDatabaseFromWal {
|
||||
XlCreateDatabaseFromWal {
|
||||
db_id: buf.get_u32_le(),
|
||||
tablespace_id: buf.get_u32_le(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlDropDatabase {
|
||||
|
||||
@@ -314,9 +314,6 @@ lfc_change_limit_hook(int newval, void *extra)
|
||||
lfc_ctl->used -= 1;
|
||||
}
|
||||
lfc_ctl->limit = new_size;
|
||||
if (new_size == 0) {
|
||||
lfc_ctl->generation += 1;
|
||||
}
|
||||
neon_log(DEBUG1, "set local file cache limit to %d", new_size);
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
133
pgxn/neon/neon.c
133
pgxn/neon/neon.c
@@ -11,23 +11,16 @@
|
||||
#include "postgres.h"
|
||||
#include "fmgr.h"
|
||||
|
||||
#include "miscadmin.h"
|
||||
#include "access/xact.h"
|
||||
#include "access/xlog.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "replication/slot.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/procsignal.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "funcapi.h"
|
||||
#include "access/htup_details.h"
|
||||
#include "utils/pg_lsn.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/wait_event.h"
|
||||
|
||||
#include "neon.h"
|
||||
#include "walproposer.h"
|
||||
@@ -37,130 +30,6 @@
|
||||
PG_MODULE_MAGIC;
|
||||
void _PG_init(void);
|
||||
|
||||
static int logical_replication_max_time_lag = 3600;
|
||||
|
||||
static void
|
||||
InitLogicalReplicationMonitor(void)
|
||||
{
|
||||
BackgroundWorker bgw;
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"neon.logical_replication_max_time_lag",
|
||||
"Threshold for dropping unused logical replication slots",
|
||||
NULL,
|
||||
&logical_replication_max_time_lag,
|
||||
3600, 0, INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
GUC_UNIT_S,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
memset(&bgw, 0, sizeof(bgw));
|
||||
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
|
||||
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
|
||||
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
|
||||
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "LogicalSlotsMonitorMain");
|
||||
snprintf(bgw.bgw_name, BGW_MAXLEN, "Logical replication monitor");
|
||||
snprintf(bgw.bgw_type, BGW_MAXLEN, "Logical replication monitor");
|
||||
bgw.bgw_restart_time = 5;
|
||||
bgw.bgw_notify_pid = 0;
|
||||
bgw.bgw_main_arg = (Datum) 0;
|
||||
|
||||
RegisterBackgroundWorker(&bgw);
|
||||
}
|
||||
|
||||
typedef struct
|
||||
{
|
||||
NameData name;
|
||||
bool dropped;
|
||||
XLogRecPtr confirmed_flush_lsn;
|
||||
TimestampTz last_updated;
|
||||
} SlotStatus;
|
||||
|
||||
/*
|
||||
* Unused logical replication slots pins WAL and prevents deletion of snapshots.
|
||||
*/
|
||||
PGDLLEXPORT void
|
||||
LogicalSlotsMonitorMain(Datum main_arg)
|
||||
{
|
||||
SlotStatus* slots;
|
||||
TimestampTz now, last_checked;
|
||||
|
||||
/* Establish signal handlers. */
|
||||
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
|
||||
pqsignal(SIGHUP, SignalHandlerForConfigReload);
|
||||
pqsignal(SIGTERM, die);
|
||||
|
||||
BackgroundWorkerUnblockSignals();
|
||||
|
||||
slots = (SlotStatus*)calloc(max_replication_slots, sizeof(SlotStatus));
|
||||
last_checked = GetCurrentTimestamp();
|
||||
|
||||
for (;;)
|
||||
{
|
||||
(void) WaitLatch(MyLatch,
|
||||
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
|
||||
logical_replication_max_time_lag*1000/2,
|
||||
PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
now = GetCurrentTimestamp();
|
||||
|
||||
if (now - last_checked > logical_replication_max_time_lag*USECS_PER_SEC)
|
||||
{
|
||||
int n_active_slots = 0;
|
||||
last_checked = now;
|
||||
|
||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||
for (int i = 0; i < max_replication_slots; i++)
|
||||
{
|
||||
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
||||
|
||||
/* Consider only logical repliction slots */
|
||||
if (!s->in_use || !SlotIsLogical(s))
|
||||
continue;
|
||||
|
||||
if (s->active_pid != 0)
|
||||
{
|
||||
n_active_slots += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Check if there was some activity with the slot since last check */
|
||||
if (s->data.confirmed_flush != slots[i].confirmed_flush_lsn)
|
||||
{
|
||||
slots[i].confirmed_flush_lsn = s->data.confirmed_flush;
|
||||
slots[i].last_updated = now;
|
||||
}
|
||||
else if (now - slots[i].last_updated > logical_replication_max_time_lag*USECS_PER_SEC)
|
||||
{
|
||||
slots[i].name = s->data.name;
|
||||
slots[i].dropped = true;
|
||||
}
|
||||
}
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
|
||||
/*
|
||||
* If there are no active subscriptions, then no new snapshots are generated
|
||||
* and so no need to force slot deletion.
|
||||
*/
|
||||
if (n_active_slots != 0)
|
||||
{
|
||||
for (int i = 0; i < max_replication_slots; i++)
|
||||
{
|
||||
if (slots[i].dropped)
|
||||
{
|
||||
elog(LOG, "Drop logical replication slot because it was not update more than %ld seconds",
|
||||
(now - slots[i].last_updated)/USECS_PER_SEC);
|
||||
ReplicationSlotDrop(slots[i].name.data, true);
|
||||
slots[i].dropped = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
_PG_init(void)
|
||||
{
|
||||
@@ -175,8 +44,6 @@ _PG_init(void)
|
||||
pg_init_libpagestore();
|
||||
pg_init_walproposer();
|
||||
|
||||
InitLogicalReplicationMonitor();
|
||||
|
||||
InitControlPlaneConnector();
|
||||
|
||||
pg_init_extension_server();
|
||||
|
||||
@@ -19,7 +19,6 @@ chrono.workspace = true
|
||||
clap.workspace = true
|
||||
consumption_metrics.workspace = true
|
||||
dashmap.workspace = true
|
||||
env_logger.workspace = true
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
hashbrown.workspace = true
|
||||
|
||||
@@ -68,7 +68,6 @@ pub trait TestBackend: Send + Sync + 'static {
|
||||
fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), console::errors::GetAuthInfoError>;
|
||||
fn get_role_secret(&self) -> Result<CachedRoleSecret, console::errors::GetAuthInfoError>;
|
||||
}
|
||||
|
||||
impl std::fmt::Display for BackendType<'_, ()> {
|
||||
@@ -359,17 +358,6 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint> {
|
||||
}
|
||||
|
||||
impl BackendType<'_, ComputeUserInfo> {
|
||||
pub async fn get_role_secret(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
) -> Result<CachedRoleSecret, GetAuthInfoError> {
|
||||
use BackendType::*;
|
||||
match self {
|
||||
Console(api, user_info) => api.get_role_secret(ctx, user_info).await,
|
||||
Link(_) => Ok(Cached::new_uncached(None)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
|
||||
@@ -167,7 +167,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, Scram<'_>> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn validate_password_and_exchange(
|
||||
pub(super) fn validate_password_and_exchange(
|
||||
password: &[u8],
|
||||
secret: AuthSecret,
|
||||
) -> super::Result<sasl::Outcome<ComputeCredentialKeys>> {
|
||||
|
||||
@@ -88,9 +88,6 @@ struct ProxyCliArgs {
|
||||
/// path to directory with TLS certificates for client postgres connections
|
||||
#[clap(long)]
|
||||
certs_dir: Option<String>,
|
||||
/// timeout for the TLS handshake
|
||||
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
|
||||
handshake_timeout: tokio::time::Duration,
|
||||
/// http endpoint to receive periodic metric updates
|
||||
#[clap(long)]
|
||||
metric_collection_endpoint: Option<String>,
|
||||
@@ -168,10 +165,6 @@ struct SqlOverHttpArgs {
|
||||
#[clap(long, default_value_t = 20)]
|
||||
sql_over_http_pool_max_conns_per_endpoint: usize,
|
||||
|
||||
/// How many connections to pool for each endpoint. Excess connections are discarded
|
||||
#[clap(long, default_value_t = 20000)]
|
||||
sql_over_http_pool_max_total_conns: usize,
|
||||
|
||||
/// How long pooled connections should remain idle for before closing
|
||||
#[clap(long, default_value = "5m", value_parser = humantime::parse_duration)]
|
||||
sql_over_http_idle_timeout: tokio::time::Duration,
|
||||
@@ -394,7 +387,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
pool_shards: args.sql_over_http.sql_over_http_pool_shards,
|
||||
idle_timeout: args.sql_over_http.sql_over_http_idle_timeout,
|
||||
opt_in: args.sql_over_http.sql_over_http_pool_opt_in,
|
||||
max_total_conns: args.sql_over_http.sql_over_http_pool_max_total_conns,
|
||||
},
|
||||
};
|
||||
let authentication_config = AuthenticationConfig {
|
||||
@@ -414,7 +406,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
require_client_ip: args.require_client_ip,
|
||||
disable_ip_check_for_http: args.disable_ip_check_for_http,
|
||||
endpoint_rps_limit,
|
||||
handshake_timeout: args.handshake_timeout,
|
||||
// TODO: add this argument
|
||||
region: args.region.clone(),
|
||||
}));
|
||||
|
||||
@@ -22,7 +22,6 @@ pub struct ProxyConfig {
|
||||
pub disable_ip_check_for_http: bool,
|
||||
pub endpoint_rps_limit: Vec<RateBucketInfo>,
|
||||
pub region: String,
|
||||
pub handshake_timeout: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -188,7 +188,6 @@ impl super::Api for Api {
|
||||
ep,
|
||||
Arc::new(auth_info.allowed_ips),
|
||||
);
|
||||
ctx.set_project_id(project_id);
|
||||
}
|
||||
// When we just got a secret, we don't need to invalidate it.
|
||||
Ok(Cached::new_uncached(auth_info.secret))
|
||||
@@ -222,7 +221,6 @@ impl super::Api for Api {
|
||||
self.caches
|
||||
.project_info
|
||||
.insert_allowed_ips(&project_id, ep, allowed_ips.clone());
|
||||
ctx.set_project_id(project_id);
|
||||
}
|
||||
Ok((
|
||||
Cached::new_uncached(allowed_ips),
|
||||
|
||||
@@ -89,10 +89,6 @@ impl RequestMonitoring {
|
||||
self.project = Some(x.project_id);
|
||||
}
|
||||
|
||||
pub fn set_project_id(&mut self, project_id: ProjectId) {
|
||||
self.project = Some(project_id);
|
||||
}
|
||||
|
||||
pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
|
||||
crate::metrics::CONNECTING_ENDPOINTS
|
||||
.with_label_values(&[self.protocol])
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
use ::metrics::{
|
||||
exponential_buckets, register_histogram, register_histogram_vec, register_hll_vec,
|
||||
register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge,
|
||||
register_int_gauge_vec, Histogram, HistogramVec, HyperLogLogVec, IntCounterPairVec,
|
||||
IntCounterVec, IntGauge, IntGaugeVec,
|
||||
register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge_vec, Histogram,
|
||||
HistogramVec, HyperLogLogVec, IntCounterPairVec, IntCounterVec, IntGaugeVec,
|
||||
};
|
||||
use metrics::{register_int_counter_pair, IntCounterPair};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio::time;
|
||||
@@ -114,44 +112,6 @@ pub static ALLOWED_IPS_NUMBER: Lazy<Histogram> = Lazy::new(|| {
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static HTTP_CONTENT_LENGTH: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"proxy_http_conn_content_length_bytes",
|
||||
"Time it took for proxy to establish a connection to the compute endpoint",
|
||||
// largest bucket = 3^16 * 0.05ms = 2.15s
|
||||
exponential_buckets(8.0, 2.0, 20).unwrap()
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static GC_LATENCY: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"proxy_http_pool_reclaimation_lag_seconds",
|
||||
"Time it takes to reclaim unused connection pools",
|
||||
// 1us -> 65ms
|
||||
exponential_buckets(1e-6, 2.0, 16).unwrap(),
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static ENDPOINT_POOLS: Lazy<IntCounterPair> = Lazy::new(|| {
|
||||
register_int_counter_pair!(
|
||||
"proxy_http_pool_endpoints_registered_total",
|
||||
"Number of endpoints we have registered pools for",
|
||||
"proxy_http_pool_endpoints_unregistered_total",
|
||||
"Number of endpoints we have unregistered pools for",
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_OPEN_CLIENTS_IN_HTTP_POOL: Lazy<IntGauge> = Lazy::new(|| {
|
||||
register_int_gauge!(
|
||||
"proxy_http_pool_opened_connections",
|
||||
"Number of opened connections to a database.",
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LatencyTimer {
|
||||
// time since the stopwatch was started
|
||||
|
||||
@@ -194,11 +194,10 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
|
||||
let pause = ctx.latency_timer.pause();
|
||||
let do_handshake = handshake(stream, mode.handshake_tls(tls), &cancel_map);
|
||||
let (mut stream, params) =
|
||||
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
|
||||
Some(x) => x,
|
||||
None => return Ok(()), // it's a cancellation request
|
||||
};
|
||||
let (mut stream, params) = match do_handshake.await? {
|
||||
Some(x) => x,
|
||||
None => return Ok(()), // it's a cancellation request
|
||||
};
|
||||
drop(pause);
|
||||
|
||||
let hostname = mode.hostname(stream.get_ref());
|
||||
|
||||
@@ -34,6 +34,21 @@ pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> compute::ConnCfg
|
||||
node_info.invalidate().config
|
||||
}
|
||||
|
||||
/// Try to connect to the compute node once.
|
||||
#[tracing::instrument(name = "connect_once", fields(pid = tracing::field::Empty), skip_all)]
|
||||
async fn connect_to_compute_once(
|
||||
ctx: &mut RequestMonitoring,
|
||||
node_info: &console::CachedNodeInfo,
|
||||
timeout: time::Duration,
|
||||
) -> Result<PostgresConnection, compute::ConnectionError> {
|
||||
let allow_self_signed_compute = node_info.allow_self_signed_compute;
|
||||
|
||||
node_info
|
||||
.config
|
||||
.connect(ctx, allow_self_signed_compute, timeout)
|
||||
.await
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait ConnectMechanism {
|
||||
type Connection;
|
||||
@@ -60,18 +75,13 @@ impl ConnectMechanism for TcpMechanism<'_> {
|
||||
type ConnectError = compute::ConnectionError;
|
||||
type Error = compute::ConnectionError;
|
||||
|
||||
#[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
|
||||
async fn connect_once(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
node_info: &console::CachedNodeInfo,
|
||||
timeout: time::Duration,
|
||||
) -> Result<PostgresConnection, Self::Error> {
|
||||
let allow_self_signed_compute = node_info.allow_self_signed_compute;
|
||||
node_info
|
||||
.config
|
||||
.connect(ctx, allow_self_signed_compute, timeout)
|
||||
.await
|
||||
connect_to_compute_once(ctx, node_info, timeout).await
|
||||
}
|
||||
|
||||
fn update_connect_config(&self, config: &mut compute::ConnCfg) {
|
||||
|
||||
@@ -478,9 +478,6 @@ impl TestBackend for TestConnectMechanism {
|
||||
{
|
||||
unimplemented!("not used in tests")
|
||||
}
|
||||
fn get_role_secret(&self) -> Result<CachedRoleSecret, console::errors::GetAuthInfoError> {
|
||||
unimplemented!("not used in tests")
|
||||
}
|
||||
}
|
||||
|
||||
fn helper_create_cached_node_info() -> CachedNodeInfo {
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
//!
|
||||
//! Handles both SQL over HTTP and SQL over Websockets.
|
||||
|
||||
mod backend;
|
||||
mod conn_pool;
|
||||
mod json;
|
||||
mod sql_over_http;
|
||||
@@ -19,11 +18,11 @@ pub use reqwest_middleware::{ClientWithMiddleware, Error};
|
||||
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
use crate::config::TlsConfig;
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::metrics::NUM_CLIENT_CONNECTION_GAUGE;
|
||||
use crate::protocol2::{ProxyProtocolAccept, WithClientIp};
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::serverless::backend::PoolingBackend;
|
||||
use crate::{cancellation::CancelMap, config::ProxyConfig};
|
||||
use futures::StreamExt;
|
||||
use hyper::{
|
||||
@@ -55,13 +54,12 @@ pub async fn task_main(
|
||||
info!("websocket server has shut down");
|
||||
}
|
||||
|
||||
let conn_pool = conn_pool::GlobalConnPool::new(&config.http_config);
|
||||
{
|
||||
let conn_pool = Arc::clone(&conn_pool);
|
||||
tokio::spawn(async move {
|
||||
conn_pool.gc_worker(StdRng::from_entropy()).await;
|
||||
});
|
||||
}
|
||||
let conn_pool = conn_pool::GlobalConnPool::new(config);
|
||||
|
||||
let conn_pool2 = Arc::clone(&conn_pool);
|
||||
tokio::spawn(async move {
|
||||
conn_pool2.gc_worker(StdRng::from_entropy()).await;
|
||||
});
|
||||
|
||||
// shutdown the connection pool
|
||||
tokio::spawn({
|
||||
@@ -75,11 +73,6 @@ pub async fn task_main(
|
||||
}
|
||||
});
|
||||
|
||||
let backend = Arc::new(PoolingBackend {
|
||||
pool: Arc::clone(&conn_pool),
|
||||
config,
|
||||
});
|
||||
|
||||
let tls_config = match config.tls_config.as_ref() {
|
||||
Some(config) => config,
|
||||
None => {
|
||||
@@ -113,7 +106,7 @@ pub async fn task_main(
|
||||
let client_addr = io.client_addr();
|
||||
let remote_addr = io.inner.remote_addr();
|
||||
let sni_name = tls.server_name().map(|s| s.to_string());
|
||||
let backend = backend.clone();
|
||||
let conn_pool = conn_pool.clone();
|
||||
let ws_connections = ws_connections.clone();
|
||||
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
|
||||
|
||||
@@ -126,7 +119,7 @@ pub async fn task_main(
|
||||
Ok(MetricService::new(hyper::service::service_fn(
|
||||
move |req: Request<Body>| {
|
||||
let sni_name = sni_name.clone();
|
||||
let backend = backend.clone();
|
||||
let conn_pool = conn_pool.clone();
|
||||
let ws_connections = ws_connections.clone();
|
||||
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
|
||||
|
||||
@@ -137,7 +130,8 @@ pub async fn task_main(
|
||||
request_handler(
|
||||
req,
|
||||
config,
|
||||
backend,
|
||||
tls_config,
|
||||
conn_pool,
|
||||
ws_connections,
|
||||
cancel_map,
|
||||
session_id,
|
||||
@@ -206,7 +200,8 @@ where
|
||||
async fn request_handler(
|
||||
mut request: Request<Body>,
|
||||
config: &'static ProxyConfig,
|
||||
backend: Arc<PoolingBackend>,
|
||||
tls: &'static TlsConfig,
|
||||
conn_pool: Arc<conn_pool::GlobalConnPool>,
|
||||
ws_connections: TaskTracker,
|
||||
cancel_map: Arc<CancelMap>,
|
||||
session_id: uuid::Uuid,
|
||||
@@ -253,7 +248,15 @@ async fn request_handler(
|
||||
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
|
||||
let mut ctx = RequestMonitoring::new(session_id, peer_addr, "http", &config.region);
|
||||
|
||||
sql_over_http::handle(config, &mut ctx, request, sni_hostname, backend).await
|
||||
sql_over_http::handle(
|
||||
tls,
|
||||
&config.http_config,
|
||||
&mut ctx,
|
||||
request,
|
||||
sni_hostname,
|
||||
conn_pool,
|
||||
)
|
||||
.await
|
||||
} else if request.uri().path() == "/sql" && request.method() == Method::OPTIONS {
|
||||
Response::builder()
|
||||
.header("Allow", "OPTIONS, POST")
|
||||
|
||||
@@ -1,157 +0,0 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use anyhow::Context;
|
||||
use async_trait::async_trait;
|
||||
use tracing::info;
|
||||
|
||||
use crate::{
|
||||
auth::{backend::ComputeCredentialKeys, check_peer_addr_is_in_list, AuthError},
|
||||
compute,
|
||||
config::ProxyConfig,
|
||||
console::CachedNodeInfo,
|
||||
context::RequestMonitoring,
|
||||
proxy::connect_compute::ConnectMechanism,
|
||||
};
|
||||
|
||||
use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool, APP_NAME};
|
||||
|
||||
pub struct PoolingBackend {
|
||||
pub pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
|
||||
pub config: &'static ProxyConfig,
|
||||
}
|
||||
|
||||
impl PoolingBackend {
|
||||
pub async fn authenticate(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
conn_info: &ConnInfo,
|
||||
) -> Result<ComputeCredentialKeys, AuthError> {
|
||||
let user_info = conn_info.user_info.clone();
|
||||
let backend = self.config.auth_backend.as_ref().map(|_| user_info.clone());
|
||||
let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
|
||||
if !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
|
||||
return Err(AuthError::ip_address_not_allowed());
|
||||
}
|
||||
let cached_secret = match maybe_secret {
|
||||
Some(secret) => secret,
|
||||
None => backend.get_role_secret(ctx).await?,
|
||||
};
|
||||
|
||||
let secret = match cached_secret.value.clone() {
|
||||
Some(secret) => secret,
|
||||
None => {
|
||||
// If we don't have an authentication secret, for the http flow we can just return an error.
|
||||
info!("authentication info not found");
|
||||
return Err(AuthError::auth_failed(&*user_info.user));
|
||||
}
|
||||
};
|
||||
let auth_outcome =
|
||||
crate::auth::validate_password_and_exchange(conn_info.password.as_bytes(), secret)?;
|
||||
match auth_outcome {
|
||||
crate::sasl::Outcome::Success(key) => Ok(key),
|
||||
crate::sasl::Outcome::Failure(reason) => {
|
||||
info!("auth backend failed with an error: {reason}");
|
||||
Err(AuthError::auth_failed(&*conn_info.user_info.user))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wake up the destination if needed. Code here is a bit involved because
|
||||
// we reuse the code from the usual proxy and we need to prepare few structures
|
||||
// that this code expects.
|
||||
#[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
|
||||
pub async fn connect_to_compute(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
conn_info: ConnInfo,
|
||||
keys: ComputeCredentialKeys,
|
||||
force_new: bool,
|
||||
) -> anyhow::Result<Client<tokio_postgres::Client>> {
|
||||
let maybe_client = if !force_new {
|
||||
info!("pool: looking for an existing connection");
|
||||
self.pool.get(ctx, &conn_info).await?
|
||||
} else {
|
||||
info!("pool: pool is disabled");
|
||||
None
|
||||
};
|
||||
|
||||
if let Some(client) = maybe_client {
|
||||
return Ok(client);
|
||||
}
|
||||
let conn_id = uuid::Uuid::new_v4();
|
||||
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
|
||||
ctx.set_application(Some(APP_NAME));
|
||||
let backend = self
|
||||
.config
|
||||
.auth_backend
|
||||
.as_ref()
|
||||
.map(|_| conn_info.user_info.clone());
|
||||
|
||||
let mut node_info = backend
|
||||
.wake_compute(ctx)
|
||||
.await?
|
||||
.context("missing cache entry from wake_compute")?;
|
||||
|
||||
match keys {
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
ComputeCredentialKeys::Password(password) => node_info.config.password(password),
|
||||
ComputeCredentialKeys::AuthKeys(auth_keys) => node_info.config.auth_keys(auth_keys),
|
||||
};
|
||||
|
||||
ctx.set_project(node_info.aux.clone());
|
||||
|
||||
crate::proxy::connect_compute::connect_to_compute(
|
||||
ctx,
|
||||
&TokioMechanism {
|
||||
conn_id,
|
||||
conn_info,
|
||||
pool: self.pool.clone(),
|
||||
},
|
||||
node_info,
|
||||
&backend,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
struct TokioMechanism {
|
||||
pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
|
||||
conn_info: ConnInfo,
|
||||
conn_id: uuid::Uuid,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ConnectMechanism for TokioMechanism {
|
||||
type Connection = Client<tokio_postgres::Client>;
|
||||
type ConnectError = tokio_postgres::Error;
|
||||
type Error = anyhow::Error;
|
||||
|
||||
async fn connect_once(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
node_info: &CachedNodeInfo,
|
||||
timeout: Duration,
|
||||
) -> Result<Self::Connection, Self::ConnectError> {
|
||||
let mut config = (*node_info.config).clone();
|
||||
let config = config
|
||||
.user(&self.conn_info.user_info.user)
|
||||
.password(&*self.conn_info.password)
|
||||
.dbname(&self.conn_info.dbname)
|
||||
.connect_timeout(timeout);
|
||||
|
||||
let (client, connection) = config.connect(tokio_postgres::NoTls).await?;
|
||||
|
||||
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
|
||||
Ok(poll_client(
|
||||
self.pool.clone(),
|
||||
ctx,
|
||||
self.conn_info.clone(),
|
||||
client,
|
||||
connection,
|
||||
self.conn_id,
|
||||
node_info.aux.clone(),
|
||||
))
|
||||
}
|
||||
|
||||
fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -9,23 +9,23 @@ use tokio_postgres::Row;
|
||||
// as parameters.
|
||||
//
|
||||
pub fn json_to_pg_text(json: Vec<Value>) -> Vec<Option<String>> {
|
||||
json.iter().map(json_value_to_pg_text).collect()
|
||||
}
|
||||
json.iter()
|
||||
.map(|value| {
|
||||
match value {
|
||||
// special care for nulls
|
||||
Value::Null => None,
|
||||
|
||||
fn json_value_to_pg_text(value: &Value) -> Option<String> {
|
||||
match value {
|
||||
// special care for nulls
|
||||
Value::Null => None,
|
||||
// convert to text with escaping
|
||||
v @ (Value::Bool(_) | Value::Number(_) | Value::Object(_)) => Some(v.to_string()),
|
||||
|
||||
// convert to text with escaping
|
||||
v @ (Value::Bool(_) | Value::Number(_) | Value::Object(_)) => Some(v.to_string()),
|
||||
// avoid escaping here, as we pass this as a parameter
|
||||
Value::String(s) => Some(s.to_string()),
|
||||
|
||||
// avoid escaping here, as we pass this as a parameter
|
||||
Value::String(s) => Some(s.to_string()),
|
||||
|
||||
// special care for arrays
|
||||
Value::Array(_) => json_array_to_pg_array(value),
|
||||
}
|
||||
// special care for arrays
|
||||
Value::Array(_) => json_array_to_pg_array(value),
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
@@ -13,7 +13,6 @@ use hyper::StatusCode;
|
||||
use hyper::{Body, HeaderMap, Request};
|
||||
use serde_json::json;
|
||||
use serde_json::Value;
|
||||
use tokio::join;
|
||||
use tokio_postgres::error::DbError;
|
||||
use tokio_postgres::error::ErrorPosition;
|
||||
use tokio_postgres::GenericClient;
|
||||
@@ -21,7 +20,6 @@ use tokio_postgres::IsolationLevel;
|
||||
use tokio_postgres::ReadyForQueryStatus;
|
||||
use tokio_postgres::Transaction;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
use tracing::instrument;
|
||||
use url::Url;
|
||||
use utils::http::error::ApiError;
|
||||
@@ -29,25 +27,22 @@ use utils::http::json::json_response;
|
||||
|
||||
use crate::auth::backend::ComputeUserInfo;
|
||||
use crate::auth::endpoint_sni;
|
||||
use crate::config::ProxyConfig;
|
||||
use crate::config::HttpConfig;
|
||||
use crate::config::TlsConfig;
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::metrics::HTTP_CONTENT_LENGTH;
|
||||
use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::RoleName;
|
||||
|
||||
use super::backend::PoolingBackend;
|
||||
use super::conn_pool::ConnInfo;
|
||||
use super::json::json_to_pg_text;
|
||||
use super::json::pg_text_row_to_json;
|
||||
use super::conn_pool::GlobalConnPool;
|
||||
use super::json::{json_to_pg_text, pg_text_row_to_json};
|
||||
use super::SERVERLESS_DRIVER_SNI;
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
struct QueryData {
|
||||
query: String,
|
||||
#[serde(deserialize_with = "bytes_to_pg_text")]
|
||||
params: Vec<Option<String>>,
|
||||
params: Vec<serde_json::Value>,
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
@@ -74,15 +69,6 @@ static TXN_DEFERRABLE: HeaderName = HeaderName::from_static("neon-batch-deferrab
|
||||
|
||||
static HEADER_VALUE_TRUE: HeaderValue = HeaderValue::from_static("true");
|
||||
|
||||
fn bytes_to_pg_text<'de, D>(deserializer: D) -> Result<Vec<Option<String>>, D::Error>
|
||||
where
|
||||
D: serde::de::Deserializer<'de>,
|
||||
{
|
||||
// TODO: consider avoiding the allocation here.
|
||||
let json: Vec<Value> = serde::de::Deserialize::deserialize(deserializer)?;
|
||||
Ok(json_to_pg_text(json))
|
||||
}
|
||||
|
||||
fn get_conn_info(
|
||||
ctx: &mut RequestMonitoring,
|
||||
headers: &HeaderMap,
|
||||
@@ -185,15 +171,16 @@ fn check_matches(sni_hostname: &str, hostname: &str) -> Result<bool, anyhow::Err
|
||||
|
||||
// TODO: return different http error codes
|
||||
pub async fn handle(
|
||||
config: &'static ProxyConfig,
|
||||
tls: &'static TlsConfig,
|
||||
config: &'static HttpConfig,
|
||||
ctx: &mut RequestMonitoring,
|
||||
request: Request<Body>,
|
||||
sni_hostname: Option<String>,
|
||||
backend: Arc<PoolingBackend>,
|
||||
conn_pool: Arc<GlobalConnPool>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let result = tokio::time::timeout(
|
||||
config.http_config.request_timeout,
|
||||
handle_inner(config, ctx, request, sni_hostname, backend),
|
||||
config.request_timeout,
|
||||
handle_inner(tls, config, ctx, request, sni_hostname, conn_pool),
|
||||
)
|
||||
.await;
|
||||
let mut response = match result {
|
||||
@@ -278,7 +265,7 @@ pub async fn handle(
|
||||
Err(_) => {
|
||||
let message = format!(
|
||||
"HTTP-Connection timed out, execution time exeeded {} seconds",
|
||||
config.http_config.request_timeout.as_secs()
|
||||
config.request_timeout.as_secs()
|
||||
);
|
||||
error!(message);
|
||||
json_response(
|
||||
@@ -296,36 +283,22 @@ pub async fn handle(
|
||||
|
||||
#[instrument(name = "sql-over-http", fields(pid = tracing::field::Empty), skip_all)]
|
||||
async fn handle_inner(
|
||||
config: &'static ProxyConfig,
|
||||
tls: &'static TlsConfig,
|
||||
config: &'static HttpConfig,
|
||||
ctx: &mut RequestMonitoring,
|
||||
request: Request<Body>,
|
||||
sni_hostname: Option<String>,
|
||||
backend: Arc<PoolingBackend>,
|
||||
conn_pool: Arc<GlobalConnPool>,
|
||||
) -> anyhow::Result<Response<Body>> {
|
||||
let _request_gauge = NUM_CONNECTION_REQUESTS_GAUGE
|
||||
.with_label_values(&[ctx.protocol])
|
||||
.with_label_values(&["http"])
|
||||
.guard();
|
||||
info!(
|
||||
protocol = ctx.protocol,
|
||||
"handling interactive connection from client"
|
||||
);
|
||||
|
||||
//
|
||||
// Determine the destination and connection params
|
||||
//
|
||||
let headers = request.headers();
|
||||
// TLS config should be there.
|
||||
let conn_info = get_conn_info(
|
||||
ctx,
|
||||
headers,
|
||||
sni_hostname,
|
||||
config.tls_config.as_ref().unwrap(),
|
||||
)?;
|
||||
info!(
|
||||
user = conn_info.user_info.user.as_str(),
|
||||
project = conn_info.user_info.endpoint.as_str(),
|
||||
"credentials"
|
||||
);
|
||||
let conn_info = get_conn_info(ctx, headers, sni_hostname, tls)?;
|
||||
|
||||
// Determine the output options. Default behaviour is 'false'. Anything that is not
|
||||
// strictly 'true' assumed to be false.
|
||||
@@ -334,8 +307,8 @@ async fn handle_inner(
|
||||
|
||||
// Allow connection pooling only if explicitly requested
|
||||
// or if we have decided that http pool is no longer opt-in
|
||||
let allow_pool = !config.http_config.pool_options.opt_in
|
||||
|| headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE);
|
||||
let allow_pool =
|
||||
!config.pool_options.opt_in || headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE);
|
||||
|
||||
// isolation level, read only and deferrable
|
||||
|
||||
@@ -360,8 +333,6 @@ async fn handle_inner(
|
||||
None => MAX_REQUEST_SIZE + 1,
|
||||
};
|
||||
drop(paused);
|
||||
info!(request_content_length, "request size in bytes");
|
||||
HTTP_CONTENT_LENGTH.observe(request_content_length as f64);
|
||||
|
||||
// we don't have a streaming request support yet so this is to prevent OOM
|
||||
// from a malicious user sending an extremely large request body
|
||||
@@ -371,28 +342,13 @@ async fn handle_inner(
|
||||
));
|
||||
}
|
||||
|
||||
let fetch_and_process_request = async {
|
||||
let body = hyper::body::to_bytes(request.into_body())
|
||||
.await
|
||||
.map_err(anyhow::Error::from)?;
|
||||
let payload: Payload = serde_json::from_slice(&body)?;
|
||||
Ok::<Payload, anyhow::Error>(payload) // Adjust error type accordingly
|
||||
};
|
||||
//
|
||||
// Read the query and query params from the request body
|
||||
//
|
||||
let body = hyper::body::to_bytes(request.into_body()).await?;
|
||||
let payload: Payload = serde_json::from_slice(&body)?;
|
||||
|
||||
let authenticate_and_connect = async {
|
||||
let keys = backend.authenticate(ctx, &conn_info).await?;
|
||||
backend
|
||||
.connect_to_compute(ctx, conn_info, keys, !allow_pool)
|
||||
.await
|
||||
};
|
||||
|
||||
// Run both operations in parallel
|
||||
let (payload_result, auth_and_connect_result) =
|
||||
join!(fetch_and_process_request, authenticate_and_connect,);
|
||||
|
||||
// Handle the results
|
||||
let payload = payload_result?; // Handle errors appropriately
|
||||
let mut client = auth_and_connect_result?; // Handle errors appropriately
|
||||
let mut client = conn_pool.get(ctx, conn_info, !allow_pool).await?;
|
||||
|
||||
let mut response = Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
@@ -526,7 +482,7 @@ async fn query_to_json<T: GenericClient>(
|
||||
raw_output: bool,
|
||||
array_mode: bool,
|
||||
) -> anyhow::Result<(ReadyForQueryStatus, Value)> {
|
||||
let query_params = data.params;
|
||||
let query_params = json_to_pg_text(data.params);
|
||||
let row_stream = client.query_raw_txt(&data.query, query_params).await?;
|
||||
|
||||
// Manually drain the stream into a vector to leave row_stream hanging
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[toolchain]
|
||||
channel = "1.76.0"
|
||||
channel = "1.75.0"
|
||||
profile = "default"
|
||||
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
|
||||
# https://rust-lang.github.io/rustup/concepts/profiles.html
|
||||
|
||||
@@ -1949,15 +1949,6 @@ class NeonAttachmentService:
|
||||
|
||||
return headers
|
||||
|
||||
def ready(self) -> bool:
|
||||
resp = self.request("GET", f"{self.env.attachment_service_api}/ready")
|
||||
if resp.status_code == 503:
|
||||
return False
|
||||
elif resp.status_code == 200:
|
||||
return True
|
||||
else:
|
||||
raise RuntimeError(f"Unexpected status {resp.status_code} from readiness endpoint")
|
||||
|
||||
def attach_hook_issue(
|
||||
self, tenant_shard_id: Union[TenantId, TenantShardId], pageserver_id: int
|
||||
) -> int:
|
||||
@@ -4063,7 +4054,7 @@ def logical_replication_sync(subscriber: VanillaPostgres, publisher: Endpoint) -
|
||||
|
||||
|
||||
def tenant_get_shards(
|
||||
env: NeonEnv, tenant_id: TenantId, pageserver_id: Optional[int] = None
|
||||
env: NeonEnv, tenant_id: TenantId, pageserver_id: Optional[int]
|
||||
) -> list[tuple[TenantShardId, NeonPageserver]]:
|
||||
"""
|
||||
Helper for when you want to talk to one or more pageservers, and the
|
||||
|
||||
@@ -393,11 +393,11 @@ def test_sql_over_http_batch(static_proxy: NeonProxy):
|
||||
def test_sql_over_http_pool(static_proxy: NeonProxy):
|
||||
static_proxy.safe_psql("create user http_auth with password 'http' superuser")
|
||||
|
||||
def get_pid(status: int, pw: str, user="http_auth") -> Any:
|
||||
def get_pid(status: int, pw: str) -> Any:
|
||||
return static_proxy.http_query(
|
||||
GET_CONNECTION_PID_QUERY,
|
||||
[],
|
||||
user=user,
|
||||
user="http_auth",
|
||||
password=pw,
|
||||
expected_code=status,
|
||||
)
|
||||
@@ -418,14 +418,20 @@ def test_sql_over_http_pool(static_proxy: NeonProxy):
|
||||
|
||||
static_proxy.safe_psql("alter user http_auth with password 'http2'")
|
||||
|
||||
# after password change, shouldn't open a new connection because it checks password in proxy.
|
||||
rows = get_pid(200, "http2")["rows"]
|
||||
assert rows == [{"pid": pid1}]
|
||||
# after password change, should open a new connection to verify it
|
||||
pid2 = get_pid(200, "http2")["rows"][0]["pid"]
|
||||
assert pid1 != pid2
|
||||
|
||||
time.sleep(0.02)
|
||||
|
||||
# incorrect user shouldn't reveal that the user doesn't exists
|
||||
res = get_pid(400, "http", user="http_auth2")
|
||||
# query should be on an existing connection
|
||||
pid = get_pid(200, "http2")["rows"][0]["pid"]
|
||||
assert pid in [pid1, pid2]
|
||||
|
||||
time.sleep(0.02)
|
||||
|
||||
# old password should not work
|
||||
res = get_pid(400, "http")
|
||||
assert "password authentication failed for user" in res["message"]
|
||||
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
tenant_get_shards,
|
||||
)
|
||||
from fixtures.remote_storage import s3_storage
|
||||
from fixtures.types import TimelineId
|
||||
@@ -83,130 +82,4 @@ def test_sharding_smoke(
|
||||
)
|
||||
assert timelines == {env.initial_timeline, timeline_b}
|
||||
|
||||
|
||||
def test_sharding_split_smoke(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Test the basics of shard splitting:
|
||||
- The API results in more shards than we started with
|
||||
- The tenant's data remains readable
|
||||
|
||||
"""
|
||||
|
||||
# We will start with 4 shards and split into 8, then migrate all those
|
||||
# 8 shards onto separate pageservers
|
||||
shard_count = 4
|
||||
split_shard_count = 8
|
||||
neon_env_builder.num_pageservers = split_shard_count
|
||||
|
||||
# 1MiB stripes: enable getting some meaningful data distribution without
|
||||
# writing large quantities of data in this test. The stripe size is given
|
||||
# in number of 8KiB pages.
|
||||
stripe_size = 128
|
||||
|
||||
# Use S3-compatible remote storage so that we can scrub: this test validates
|
||||
# that the scrubber doesn't barf when it sees a sharded tenant.
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
neon_env_builder.enable_scrub_on_exit()
|
||||
|
||||
neon_env_builder.preserve_database_files = True
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_shard_count=shard_count, initial_tenant_shard_stripe_size=stripe_size
|
||||
)
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
workload = Workload(env, tenant_id, timeline_id, branch_name="main")
|
||||
workload.init()
|
||||
|
||||
# Initial data
|
||||
workload.write_rows(256)
|
||||
|
||||
# Note which pageservers initially hold a shard after tenant creation
|
||||
pre_split_pageserver_ids = [loc["node_id"] for loc in env.attachment_service.locate(tenant_id)]
|
||||
|
||||
# For pageservers holding a shard, validate their ingest statistics
|
||||
# reflect a proper splitting of the WAL.
|
||||
for pageserver in env.pageservers:
|
||||
if pageserver.id not in pre_split_pageserver_ids:
|
||||
continue
|
||||
|
||||
metrics = pageserver.http_client().get_metrics_values(
|
||||
[
|
||||
"pageserver_wal_ingest_records_received_total",
|
||||
"pageserver_wal_ingest_records_committed_total",
|
||||
"pageserver_wal_ingest_records_filtered_total",
|
||||
]
|
||||
)
|
||||
|
||||
log.info(f"Pageserver {pageserver.id} metrics: {metrics}")
|
||||
|
||||
# Not everything received was committed
|
||||
assert (
|
||||
metrics["pageserver_wal_ingest_records_received_total"]
|
||||
> metrics["pageserver_wal_ingest_records_committed_total"]
|
||||
)
|
||||
|
||||
# Something was committed
|
||||
assert metrics["pageserver_wal_ingest_records_committed_total"] > 0
|
||||
|
||||
# Counts are self consistent
|
||||
assert (
|
||||
metrics["pageserver_wal_ingest_records_received_total"]
|
||||
== metrics["pageserver_wal_ingest_records_committed_total"]
|
||||
+ metrics["pageserver_wal_ingest_records_filtered_total"]
|
||||
)
|
||||
|
||||
# TODO: validate that shards have different sizes
|
||||
|
||||
workload.validate()
|
||||
|
||||
assert len(pre_split_pageserver_ids) == 4
|
||||
|
||||
env.attachment_service.tenant_shard_split(tenant_id, shard_count=split_shard_count)
|
||||
|
||||
post_split_pageserver_ids = [loc["node_id"] for loc in env.attachment_service.locate(tenant_id)]
|
||||
# We should have split into 8 shards, on the same 4 pageservers we started on.
|
||||
assert len(post_split_pageserver_ids) == split_shard_count
|
||||
assert len(set(post_split_pageserver_ids)) == shard_count
|
||||
assert set(post_split_pageserver_ids) == set(pre_split_pageserver_ids)
|
||||
|
||||
workload.validate()
|
||||
|
||||
workload.churn_rows(256)
|
||||
|
||||
workload.validate()
|
||||
|
||||
# Run GC on all new shards, to check they don't barf or delete anything that breaks reads
|
||||
# (compaction was already run as part of churn_rows)
|
||||
all_shards = tenant_get_shards(env, tenant_id)
|
||||
for tenant_shard_id, pageserver in all_shards:
|
||||
pageserver.http_client().timeline_gc(tenant_shard_id, timeline_id, None)
|
||||
|
||||
# Restart all nodes, to check that the newly created shards are durable
|
||||
for ps in env.pageservers:
|
||||
ps.restart()
|
||||
|
||||
workload.validate()
|
||||
|
||||
migrate_to_pageserver_ids = list(
|
||||
set(p.id for p in env.pageservers) - set(pre_split_pageserver_ids)
|
||||
)
|
||||
assert len(migrate_to_pageserver_ids) == split_shard_count - shard_count
|
||||
|
||||
# Migrate shards away from the node where the split happened
|
||||
for ps_id in pre_split_pageserver_ids:
|
||||
shards_here = [
|
||||
tenant_shard_id
|
||||
for (tenant_shard_id, pageserver) in all_shards
|
||||
if pageserver.id == ps_id
|
||||
]
|
||||
assert len(shards_here) == 2
|
||||
migrate_shard = shards_here[0]
|
||||
destination = migrate_to_pageserver_ids.pop()
|
||||
|
||||
log.info(f"Migrating shard {migrate_shard} from {ps_id} to {destination}")
|
||||
env.neon_cli.tenant_migrate(migrate_shard, destination, timeout_secs=10)
|
||||
|
||||
workload.validate()
|
||||
# TODO: test timeline deletion and tenant deletion (depends on change in attachment_service)
|
||||
|
||||
@@ -128,38 +128,6 @@ def test_sharding_service_smoke(
|
||||
assert counts[env.pageservers[2].id] == tenant_shard_count // 2
|
||||
|
||||
|
||||
def test_node_status_after_restart(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Initially we have two online pageservers
|
||||
nodes = env.attachment_service.node_list()
|
||||
assert len(nodes) == 2
|
||||
|
||||
env.pageservers[1].stop()
|
||||
|
||||
env.attachment_service.stop()
|
||||
env.attachment_service.start()
|
||||
|
||||
# Initially readiness check should fail because we're trying to connect to the offline node
|
||||
assert env.attachment_service.ready() is False
|
||||
|
||||
def is_ready():
|
||||
assert env.attachment_service.ready() is True
|
||||
|
||||
wait_until(30, 1, is_ready)
|
||||
|
||||
# We loaded nodes from database on restart
|
||||
nodes = env.attachment_service.node_list()
|
||||
assert len(nodes) == 2
|
||||
|
||||
# We should still be able to create a tenant, because the pageserver which is still online
|
||||
# should have had its availabilty state set to Active.
|
||||
env.attachment_service.tenant_create(TenantId.generate())
|
||||
|
||||
|
||||
def test_sharding_service_passthrough(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
|
||||
Reference in New Issue
Block a user