mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
6 Commits
release-pr
...
jcsp/tenan
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c4b0ecec33 | ||
|
|
a5565cc641 | ||
|
|
1804abd23d | ||
|
|
0d4247d2d8 | ||
|
|
b968ae0aa8 | ||
|
|
f901cd459b |
@@ -654,17 +654,20 @@ fn start_pageserver(
|
||||
None,
|
||||
"libpq endpoint listener",
|
||||
true,
|
||||
async move {
|
||||
page_service::libpq_listener_main(
|
||||
conf,
|
||||
broker_client,
|
||||
pg_auth,
|
||||
pageserver_listener,
|
||||
conf.pg_auth_type,
|
||||
libpq_ctx,
|
||||
task_mgr::shutdown_token(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
let tenant_manager = tenant_manager.clone();
|
||||
async move {
|
||||
page_service::libpq_listener_main(
|
||||
tenant_manager,
|
||||
broker_client,
|
||||
pg_auth,
|
||||
pageserver_listener,
|
||||
conf.pg_auth_type,
|
||||
libpq_ctx,
|
||||
task_mgr::shutdown_token(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
@@ -49,7 +49,6 @@ use utils::{
|
||||
use crate::auth::check_permission;
|
||||
use crate::basebackup;
|
||||
use crate::basebackup::BasebackupError;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::import_datadir::import_wal_from_tar;
|
||||
use crate::metrics;
|
||||
@@ -59,10 +58,9 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::mgr;
|
||||
use crate::tenant::mgr::get_active_tenant_with_timeout;
|
||||
use crate::tenant::mgr::GetActiveTenantError;
|
||||
use crate::tenant::mgr::ShardSelector;
|
||||
use crate::tenant::mgr::TenantManager;
|
||||
use crate::tenant::timeline::WaitLsnError;
|
||||
use crate::tenant::GetTimelineError;
|
||||
use crate::tenant::PageReconstructError;
|
||||
@@ -135,7 +133,7 @@ async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()
|
||||
/// Listens for connections, and launches a new handler task for each.
|
||||
///
|
||||
pub async fn libpq_listener_main(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
listener: TcpListener,
|
||||
@@ -180,7 +178,7 @@ pub async fn libpq_listener_main(
|
||||
"serving compute connection task",
|
||||
false,
|
||||
page_service_conn_main(
|
||||
conf,
|
||||
tenant_manager.clone(),
|
||||
broker_client.clone(),
|
||||
local_auth,
|
||||
socket,
|
||||
@@ -203,7 +201,7 @@ pub async fn libpq_listener_main(
|
||||
|
||||
#[instrument(skip_all, fields(peer_addr))]
|
||||
async fn page_service_conn_main(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
socket: tokio::net::TcpStream,
|
||||
@@ -260,7 +258,8 @@ async fn page_service_conn_main(
|
||||
// and create a child per-query context when it invokes process_query.
|
||||
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
|
||||
// and create the per-query context in process_query ourselves.
|
||||
let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
|
||||
let mut conn_handler =
|
||||
PageServerHandler::new(tenant_manager, broker_client, auth, connection_ctx);
|
||||
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
|
||||
|
||||
match pgbackend
|
||||
@@ -291,11 +290,12 @@ struct HandlerTimeline {
|
||||
}
|
||||
|
||||
struct PageServerHandler {
|
||||
_conf: &'static PageServerConf,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
claims: Option<Claims>,
|
||||
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
|
||||
/// The context created for the lifetime of the connection
|
||||
/// services by this PageServerHandler.
|
||||
/// For each query received over the connection,
|
||||
@@ -381,13 +381,13 @@ impl From<WaitLsnError> for QueryError {
|
||||
|
||||
impl PageServerHandler {
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
connection_ctx: RequestContext,
|
||||
) -> Self {
|
||||
PageServerHandler {
|
||||
_conf: conf,
|
||||
tenant_manager,
|
||||
broker_client,
|
||||
auth,
|
||||
claims: None,
|
||||
@@ -552,13 +552,15 @@ impl PageServerHandler {
|
||||
{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
|
||||
|
||||
let tenant = mgr::get_active_tenant_with_timeout(
|
||||
tenant_id,
|
||||
ShardSelector::First,
|
||||
ACTIVE_TENANT_TIMEOUT,
|
||||
&task_mgr::shutdown_token(),
|
||||
)
|
||||
.await?;
|
||||
let tenant = self
|
||||
.tenant_manager
|
||||
.get_active_tenant_with_timeout(
|
||||
tenant_id,
|
||||
ShardSelector::First,
|
||||
ACTIVE_TENANT_TIMEOUT,
|
||||
&task_mgr::shutdown_token(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Make request tracer if needed
|
||||
let mut tracer = if tenant.get_trace_read_requests() {
|
||||
@@ -726,13 +728,15 @@ impl PageServerHandler {
|
||||
|
||||
// Create empty timeline
|
||||
info!("creating new timeline");
|
||||
let tenant = get_active_tenant_with_timeout(
|
||||
tenant_id,
|
||||
ShardSelector::Zero,
|
||||
ACTIVE_TENANT_TIMEOUT,
|
||||
&task_mgr::shutdown_token(),
|
||||
)
|
||||
.await?;
|
||||
let tenant = self
|
||||
.tenant_manager
|
||||
.get_active_tenant_with_timeout(
|
||||
tenant_id,
|
||||
ShardSelector::Zero,
|
||||
ACTIVE_TENANT_TIMEOUT,
|
||||
&task_mgr::shutdown_token(),
|
||||
)
|
||||
.await?;
|
||||
let timeline = tenant
|
||||
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
|
||||
.await?;
|
||||
@@ -1370,14 +1374,16 @@ impl PageServerHandler {
|
||||
timeline_id: TimelineId,
|
||||
selector: ShardSelector,
|
||||
) -> Result<Arc<Timeline>, GetActiveTimelineError> {
|
||||
let tenant = get_active_tenant_with_timeout(
|
||||
tenant_id,
|
||||
selector,
|
||||
ACTIVE_TENANT_TIMEOUT,
|
||||
&task_mgr::shutdown_token(),
|
||||
)
|
||||
.await
|
||||
.map_err(GetActiveTimelineError::Tenant)?;
|
||||
let tenant = self
|
||||
.tenant_manager
|
||||
.get_active_tenant_with_timeout(
|
||||
tenant_id,
|
||||
selector,
|
||||
ACTIVE_TENANT_TIMEOUT,
|
||||
&task_mgr::shutdown_token(),
|
||||
)
|
||||
.await
|
||||
.map_err(GetActiveTimelineError::Tenant)?;
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
set_tracing_field_shard_id(&timeline);
|
||||
Ok(timeline)
|
||||
@@ -1771,13 +1777,15 @@ where
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
let tenant = get_active_tenant_with_timeout(
|
||||
tenant_id,
|
||||
ShardSelector::Zero,
|
||||
ACTIVE_TENANT_TIMEOUT,
|
||||
&task_mgr::shutdown_token(),
|
||||
)
|
||||
.await?;
|
||||
let tenant = self
|
||||
.tenant_manager
|
||||
.get_active_tenant_with_timeout(
|
||||
tenant_id,
|
||||
ShardSelector::Zero,
|
||||
ACTIVE_TENANT_TIMEOUT,
|
||||
&task_mgr::shutdown_token(),
|
||||
)
|
||||
.await?;
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||
RowDescriptor::int8_col(b"checkpoint_distance"),
|
||||
RowDescriptor::int8_col(b"checkpoint_timeout"),
|
||||
|
||||
@@ -119,6 +119,7 @@ pub(crate) enum TenantsMapRemoveResult {
|
||||
|
||||
/// When resolving a TenantId to a shard, we may be looking for the 0th
|
||||
/// shard, or we might be looking for whichever shard holds a particular page.
|
||||
#[derive(Copy, Clone)]
|
||||
pub(crate) enum ShardSelector {
|
||||
/// Only return the 0th shard, if it is present. If a non-0th shard is present,
|
||||
/// ignore it.
|
||||
@@ -169,6 +170,14 @@ impl TenantStartupMode {
|
||||
}
|
||||
}
|
||||
|
||||
/// Result type for looking up a TenantId to a specific shard
|
||||
enum ShardResolveResult {
|
||||
NotFound,
|
||||
Found(Arc<Tenant>),
|
||||
// Wait for this barrrier, then query again
|
||||
InProgress(utils::completion::Barrier),
|
||||
}
|
||||
|
||||
impl TenantsMap {
|
||||
/// Convenience function for typical usage, where we want to get a `Tenant` object, for
|
||||
/// working with attached tenants. If the TenantId is in the map but in Secondary state,
|
||||
@@ -184,26 +193,40 @@ impl TenantsMap {
|
||||
|
||||
/// A page service client sends a TenantId, and to look up the correct Tenant we must
|
||||
/// resolve this to a fully qualified TenantShardId.
|
||||
///
|
||||
/// During shard splits: we shall see parent shards in InProgress state and skip them, and
|
||||
/// instead match on child shards which should appear in Attached state. Very early in a shard
|
||||
/// split, or in other cases where a shard is InProgress, we will return our own InProgress result
|
||||
/// to instruct the caller to wait for that to finish before querying again.
|
||||
fn resolve_attached_shard(
|
||||
&self,
|
||||
tenant_id: &TenantId,
|
||||
selector: ShardSelector,
|
||||
) -> Option<TenantShardId> {
|
||||
) -> ShardResolveResult {
|
||||
let mut want_shard = None;
|
||||
let mut any_in_progress = None;
|
||||
|
||||
match self {
|
||||
TenantsMap::Initializing => None,
|
||||
TenantsMap::Initializing => ShardResolveResult::NotFound,
|
||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
|
||||
for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
|
||||
// Ignore all slots that don't contain an attached tenant
|
||||
let tenant = match &slot.1 {
|
||||
TenantSlot::Attached(t) => t,
|
||||
TenantSlot::InProgress(barrier) => {
|
||||
// We might still find a usable shard, but in case we don't, remember that
|
||||
// we saw at least one InProgress slot, so that we can distinguish this case
|
||||
// from a simple NotFound in our return value.
|
||||
any_in_progress = Some(barrier.clone());
|
||||
continue;
|
||||
}
|
||||
_ => continue,
|
||||
};
|
||||
|
||||
match selector {
|
||||
ShardSelector::First => return Some(*slot.0),
|
||||
ShardSelector::First => return ShardResolveResult::Found(tenant.clone()),
|
||||
ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
|
||||
return Some(*slot.0)
|
||||
return ShardResolveResult::Found(tenant.clone())
|
||||
}
|
||||
ShardSelector::Page(key) => {
|
||||
// First slot we see for this tenant, calculate the expected shard number
|
||||
@@ -214,15 +237,21 @@ impl TenantsMap {
|
||||
}
|
||||
|
||||
if Some(tenant.shard_identity.number) == want_shard {
|
||||
return Some(*slot.0);
|
||||
return ShardResolveResult::Found(tenant.clone());
|
||||
}
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
|
||||
// Fall through: we didn't find an acceptable shard
|
||||
None
|
||||
// Fall through: we didn't find a slot that was in Attached state & matched our selector. If
|
||||
// we found one or more InProgress slot, indicate to caller that they should retry later. Otherwise
|
||||
// this requested shard simply isn't found.
|
||||
if let Some(barrier) = any_in_progress {
|
||||
ShardResolveResult::InProgress(barrier)
|
||||
} else {
|
||||
ShardResolveResult::NotFound
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1693,7 +1722,7 @@ impl TenantManager {
|
||||
for child_shard_id in &child_shards {
|
||||
let child_shard_id = *child_shard_id;
|
||||
let child_shard = {
|
||||
let locked = TENANTS.read().unwrap();
|
||||
let locked = self.tenants.read().unwrap();
|
||||
let peek_slot =
|
||||
tenant_map_peek_slot(&locked, &child_shard_id, TenantSlotPeekMode::Read)?;
|
||||
peek_slot.and_then(|s| s.get_attached()).cloned()
|
||||
@@ -1926,13 +1955,7 @@ impl TenantManager {
|
||||
deletion_queue_client: &DeletionQueueClient,
|
||||
) -> Result<(), TenantStateError> {
|
||||
let tmp_path = self
|
||||
.detach_tenant0(
|
||||
conf,
|
||||
&TENANTS,
|
||||
tenant_shard_id,
|
||||
detach_ignored,
|
||||
deletion_queue_client,
|
||||
)
|
||||
.detach_tenant0(conf, tenant_shard_id, detach_ignored, deletion_queue_client)
|
||||
.await?;
|
||||
spawn_background_purge(tmp_path);
|
||||
|
||||
@@ -1942,7 +1965,6 @@ impl TenantManager {
|
||||
async fn detach_tenant0(
|
||||
&self,
|
||||
conf: &'static PageServerConf,
|
||||
tenants: &std::sync::RwLock<TenantsMap>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
detach_ignored: bool,
|
||||
deletion_queue_client: &DeletionQueueClient,
|
||||
@@ -1957,7 +1979,7 @@ impl TenantManager {
|
||||
};
|
||||
|
||||
let removal_result = remove_tenant_from_memory(
|
||||
tenants,
|
||||
self.tenants,
|
||||
tenant_shard_id,
|
||||
tenant_dir_rename_operation(tenant_shard_id),
|
||||
)
|
||||
@@ -1993,7 +2015,7 @@ impl TenantManager {
|
||||
pub(crate) fn list_tenants(
|
||||
&self,
|
||||
) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
|
||||
let tenants = TENANTS.read().unwrap();
|
||||
let tenants = self.tenants.read().unwrap();
|
||||
let m = match &*tenants {
|
||||
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
|
||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
|
||||
@@ -2103,6 +2125,68 @@ impl TenantManager {
|
||||
|
||||
Ok(reparented)
|
||||
}
|
||||
|
||||
/// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`]
|
||||
/// state, then wait for up to `timeout`. If the [`Tenant`] is not currently in [`TenantState::Active`],
|
||||
/// then wait for up to `timeout` (minus however long we waited for the slot).
|
||||
pub(crate) async fn get_active_tenant_with_timeout(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
shard_selector: ShardSelector,
|
||||
timeout: Duration,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Arc<Tenant>, GetActiveTenantError> {
|
||||
let wait_start = Instant::now();
|
||||
let deadline = wait_start + timeout;
|
||||
|
||||
// Resolve TenantId to TenantShardId. This is usually a quick one-shot thing, the loop is
|
||||
// for handling the rare case that the slot we're accessing is InProgress.
|
||||
let tenant_shard = loop {
|
||||
let resolved = {
|
||||
let locked = self.tenants.read().unwrap();
|
||||
locked.resolve_attached_shard(&tenant_id, shard_selector)
|
||||
};
|
||||
match resolved {
|
||||
ShardResolveResult::Found(tenant_shard) => break tenant_shard,
|
||||
ShardResolveResult::NotFound => {
|
||||
return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
|
||||
tenant_id,
|
||||
)));
|
||||
}
|
||||
ShardResolveResult::InProgress(barrier) => {
|
||||
// We can't authoritatively answer right now: wait for InProgress state
|
||||
// to end, then try again
|
||||
match timeout_cancellable(
|
||||
deadline.duration_since(Instant::now()),
|
||||
cancel,
|
||||
barrier.wait(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
// The barrier completed: proceed around the loop to try looking up again
|
||||
continue;
|
||||
}
|
||||
Err(TimeoutCancellableError::Timeout) => {
|
||||
return Err(GetActiveTenantError::WaitForActiveTimeout {
|
||||
latest_state: None,
|
||||
wait_time: timeout,
|
||||
});
|
||||
}
|
||||
Err(TimeoutCancellableError::Cancelled) => {
|
||||
return Err(GetActiveTenantError::Cancelled);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
tracing::debug!("Waiting for tenant to enter active state...");
|
||||
tenant_shard
|
||||
.wait_to_become_active(deadline.duration_since(Instant::now()))
|
||||
.await?;
|
||||
Ok(tenant_shard)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -2151,105 +2235,6 @@ pub(crate) enum GetActiveTenantError {
|
||||
Broken(String),
|
||||
}
|
||||
|
||||
/// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`]
|
||||
/// state, then wait for up to `timeout`. If the [`Tenant`] is not currently in [`TenantState::Active`],
|
||||
/// then wait for up to `timeout` (minus however long we waited for the slot).
|
||||
pub(crate) async fn get_active_tenant_with_timeout(
|
||||
tenant_id: TenantId,
|
||||
shard_selector: ShardSelector,
|
||||
timeout: Duration,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Arc<Tenant>, GetActiveTenantError> {
|
||||
enum WaitFor {
|
||||
Barrier(utils::completion::Barrier),
|
||||
Tenant(Arc<Tenant>),
|
||||
}
|
||||
|
||||
let wait_start = Instant::now();
|
||||
let deadline = wait_start + timeout;
|
||||
|
||||
let (wait_for, tenant_shard_id) = {
|
||||
let locked = TENANTS.read().unwrap();
|
||||
|
||||
// Resolve TenantId to TenantShardId
|
||||
let tenant_shard_id = locked
|
||||
.resolve_attached_shard(&tenant_id, shard_selector)
|
||||
.ok_or(GetActiveTenantError::NotFound(GetTenantError::NotFound(
|
||||
tenant_id,
|
||||
)))?;
|
||||
|
||||
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
|
||||
.map_err(GetTenantError::MapState)?;
|
||||
match peek_slot {
|
||||
Some(TenantSlot::Attached(tenant)) => {
|
||||
match tenant.current_state() {
|
||||
TenantState::Active => {
|
||||
// Fast path: we don't need to do any async waiting.
|
||||
return Ok(tenant.clone());
|
||||
}
|
||||
_ => {
|
||||
tenant.activate_now();
|
||||
(WaitFor::Tenant(tenant.clone()), tenant_shard_id)
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(TenantSlot::Secondary(_)) => {
|
||||
return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
|
||||
tenant_shard_id,
|
||||
)))
|
||||
}
|
||||
Some(TenantSlot::InProgress(barrier)) => {
|
||||
(WaitFor::Barrier(barrier.clone()), tenant_shard_id)
|
||||
}
|
||||
None => {
|
||||
return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
|
||||
tenant_id,
|
||||
)))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let tenant = match wait_for {
|
||||
WaitFor::Barrier(barrier) => {
|
||||
tracing::debug!("Waiting for tenant InProgress state to pass...");
|
||||
timeout_cancellable(
|
||||
deadline.duration_since(Instant::now()),
|
||||
cancel,
|
||||
barrier.wait(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
TimeoutCancellableError::Timeout => GetActiveTenantError::WaitForActiveTimeout {
|
||||
latest_state: None,
|
||||
wait_time: wait_start.elapsed(),
|
||||
},
|
||||
TimeoutCancellableError::Cancelled => GetActiveTenantError::Cancelled,
|
||||
})?;
|
||||
{
|
||||
let locked = TENANTS.read().unwrap();
|
||||
let peek_slot =
|
||||
tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
|
||||
.map_err(GetTenantError::MapState)?;
|
||||
match peek_slot {
|
||||
Some(TenantSlot::Attached(tenant)) => tenant.clone(),
|
||||
_ => {
|
||||
return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
|
||||
tenant_shard_id,
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
WaitFor::Tenant(tenant) => tenant,
|
||||
};
|
||||
|
||||
tracing::debug!("Waiting for tenant to enter active state...");
|
||||
tenant
|
||||
.wait_to_become_active(deadline.duration_since(Instant::now()))
|
||||
.await?;
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum DeleteTimelineError {
|
||||
#[error("Tenant {0}")]
|
||||
|
||||
Reference in New Issue
Block a user