Store operation identifier in IdLockMap on exclusive lock (#7397)

## Problem

Issues around operation and tenant locks would have been hard to debug
since there was little observability around them.

## Summary of changes

- As suggested in the issue, a wrapper was added around
`OwnedRwLockWriteGuard` called `IdentifierLock` that removes the
operation currently holding the exclusive lock when it's dropped.
- The value in `IdLockMap` was extended to hold a pair of locks and
operations that can be accessed and locked independently.
- When requesting an exclusive lock besides returning the lock on that
resource, an operation is changed if the lock is acquired.


Closes https://github.com/neondatabase/neon/issues/7108
This commit is contained in:
Jure Bajic
2024-05-03 10:38:19 +02:00
committed by GitHub
parent 240efb82f9
commit 00423152c6
6 changed files with 348 additions and 80 deletions

2
Cargo.lock generated
View File

@@ -5856,6 +5856,8 @@ dependencies = [
"routerify",
"serde",
"serde_json",
"strum",
"strum_macros",
"thiserror",
"tokio",
"tokio-util",

View File

@@ -40,6 +40,8 @@ tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
measured.workspace = true
strum.workspace = true
strum_macros.workspace = true
diesel = { version = "2.1.4", features = ["serde_json", "postgres", "r2d2"] }
diesel_migrations = { version = "2.1.0" }

View File

@@ -1,25 +1,64 @@
use std::fmt::Display;
use std::time::Instant;
use std::{collections::HashMap, sync::Arc};
use std::time::Duration;
use crate::service::RECONCILE_TIMEOUT;
const LOCK_TIMEOUT_ALERT_THRESHOLD: Duration = RECONCILE_TIMEOUT;
/// A wrapper around `OwnedRwLockWriteGuard` that when dropped changes the
/// current holding operation in lock.
pub struct WrappedWriteGuard<T: Display> {
guard: tokio::sync::OwnedRwLockWriteGuard<Option<T>>,
start: Instant,
}
impl<T: Display> WrappedWriteGuard<T> {
pub fn new(guard: tokio::sync::OwnedRwLockWriteGuard<Option<T>>) -> Self {
Self {
guard,
start: Instant::now(),
}
}
}
impl<T: Display> Drop for WrappedWriteGuard<T> {
fn drop(&mut self) {
let duration = self.start.elapsed();
if duration > LOCK_TIMEOUT_ALERT_THRESHOLD {
tracing::warn!(
"Lock on {} was held for {:?}",
self.guard.as_ref().unwrap(),
duration
);
}
*self.guard = None;
}
}
/// A map of locks covering some arbitrary identifiers. Useful if you have a collection of objects but don't
/// want to embed a lock in each one, or if your locking granularity is different to your object granularity.
/// For example, used in the storage controller where the objects are tenant shards, but sometimes locking
/// is needed at a tenant-wide granularity.
pub(crate) struct IdLockMap<T>
pub(crate) struct IdLockMap<T, I>
where
T: Eq + PartialEq + std::hash::Hash,
{
/// A synchronous lock for getting/setting the async locks that our callers will wait on.
entities: std::sync::Mutex<std::collections::HashMap<T, Arc<tokio::sync::RwLock<()>>>>,
entities: std::sync::Mutex<std::collections::HashMap<T, Arc<tokio::sync::RwLock<Option<I>>>>>,
}
impl<T> IdLockMap<T>
impl<T, I> IdLockMap<T, I>
where
T: Eq + PartialEq + std::hash::Hash,
I: Display,
{
pub(crate) fn shared(
&self,
key: T,
) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockReadGuard<()>> {
) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockReadGuard<Option<I>>> {
let mut locked = self.entities.lock().unwrap();
let entry = locked.entry(key).or_default();
entry.clone().read_owned()
@@ -28,21 +67,26 @@ where
pub(crate) fn exclusive(
&self,
key: T,
) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockWriteGuard<()>> {
operation: I,
) -> impl std::future::Future<Output = WrappedWriteGuard<I>> {
let mut locked = self.entities.lock().unwrap();
let entry = locked.entry(key).or_default();
entry.clone().write_owned()
let entry = locked.entry(key).or_default().clone();
async move {
let mut guard = WrappedWriteGuard::new(entry.clone().write_owned().await);
*guard.guard = Some(operation);
guard
}
}
/// Rather than building a lock guard that re-takes the [`Self::entities`] lock, we just do
/// periodic housekeeping to avoid the map growing indefinitely
pub(crate) fn housekeeping(&self) {
let mut locked = self.entities.lock().unwrap();
locked.retain(|_k, lock| lock.try_write().is_err())
locked.retain(|_k, entry| entry.try_write().is_err())
}
}
impl<T> Default for IdLockMap<T>
impl<T, I> Default for IdLockMap<T, I>
where
T: Eq + PartialEq + std::hash::Hash,
{
@@ -52,3 +96,94 @@ where
}
}
}
pub async fn trace_exclusive_lock<
T: Clone + Display + Eq + PartialEq + std::hash::Hash,
I: Display + Clone,
>(
op_locks: &IdLockMap<T, I>,
key: T,
operation: I,
) -> WrappedWriteGuard<I> {
let start = Instant::now();
let guard = op_locks.exclusive(key.clone(), operation.clone()).await;
let duration = start.elapsed();
if duration > LOCK_TIMEOUT_ALERT_THRESHOLD {
tracing::warn!(
"Operation {} on key {} has waited {:?} for exclusive lock",
operation,
key,
duration
);
}
guard
}
pub async fn trace_shared_lock<
T: Clone + Display + Eq + PartialEq + std::hash::Hash,
I: Display,
>(
op_locks: &IdLockMap<T, I>,
key: T,
operation: I,
) -> tokio::sync::OwnedRwLockReadGuard<Option<I>> {
let start = Instant::now();
let guard = op_locks.shared(key.clone()).await;
let duration = start.elapsed();
if duration > LOCK_TIMEOUT_ALERT_THRESHOLD {
tracing::warn!(
"Operation {} on key {} has waited {:?} for shared lock",
operation,
key,
duration
);
}
guard
}
#[cfg(test)]
mod tests {
use super::IdLockMap;
#[derive(Clone, Debug, strum_macros::Display, PartialEq)]
enum Operations {
Op1,
Op2,
}
#[tokio::test]
async fn multiple_shared_locks() {
let id_lock_map: IdLockMap<i32, Operations> = IdLockMap::default();
let shared_lock_1 = id_lock_map.shared(1).await;
let shared_lock_2 = id_lock_map.shared(1).await;
assert!(shared_lock_1.is_none());
assert!(shared_lock_2.is_none());
}
#[tokio::test]
async fn exclusive_locks() {
let id_lock_map = IdLockMap::default();
let resource_id = 1;
{
let _ex_lock = id_lock_map.exclusive(resource_id, Operations::Op1).await;
assert_eq!(_ex_lock.guard.clone().unwrap(), Operations::Op1);
let _ex_lock_2 = tokio::time::timeout(
tokio::time::Duration::from_millis(1),
id_lock_map.exclusive(resource_id, Operations::Op2),
)
.await;
assert!(_ex_lock_2.is_err());
}
let shared_lock_1 = id_lock_map.shared(resource_id).await;
assert!(shared_lock_1.is_none());
}
}

View File

@@ -9,7 +9,7 @@ use std::{
use crate::{
compute_hook::NotifyError,
id_lock_map::IdLockMap,
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard},
persistence::{AbortShardSplitStatus, TenantFilter},
reconciler::{ReconcileError, ReconcileUnits},
scheduler::{ScheduleContext, ScheduleMode},
@@ -33,6 +33,7 @@ use pageserver_api::{
models::{SecondaryProgress, TenantConfigRequest},
};
use reqwest::StatusCode;
use tracing::instrument;
use crate::pageserver_client::PageserverClient;
use pageserver_api::{
@@ -50,11 +51,11 @@ use pageserver_api::{
},
};
use pageserver_client::mgmt_api;
use tokio::sync::{mpsc::error::TrySendError, OwnedRwLockWriteGuard};
use tokio::sync::mpsc::error::TrySendError;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
use utils::{
completion::Barrier,
failpoint_support,
generation::Generation,
http::error::ApiError,
id::{NodeId, TenantId, TimelineId},
@@ -79,7 +80,7 @@ const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
// For operations that might be slow, like migrating a tenant with
// some data in it.
const RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
pub const RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
// If we receive a call using Secondary mode initially, it will omit generation. We will initialize
// tenant shards into this generation, and as long as it remains in this generation, we will accept
@@ -96,6 +97,26 @@ pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
/// (`<https://github.com/neondatabase/neon/issues/7552>`)
pub const MAX_UNAVAILABLE_INTERVAL_DEFAULT: Duration = Duration::from_secs(300);
#[derive(Clone, strum_macros::Display)]
enum TenantOperations {
Create,
LocationConfig,
ConfigSet,
TimeTravelRemoteStorage,
Delete,
UpdatePolicy,
ShardSplit,
SecondaryDownload,
TimelineCreate,
TimelineDelete,
}
#[derive(Clone, strum_macros::Display)]
enum NodeOperations {
Register,
Configure,
}
pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128;
// Depth of the channel used to enqueue shards for reconciliation when they can't do it immediately.
@@ -231,11 +252,11 @@ pub struct Service {
// Locking on a tenant granularity (covers all shards in the tenant):
// - Take exclusively for rare operations that mutate the tenant's persistent state (e.g. create/delete/split)
// - Take in shared mode for operations that need the set of shards to stay the same to complete reliably (e.g. timeline CRUD)
tenant_op_locks: IdLockMap<TenantId>,
tenant_op_locks: IdLockMap<TenantId, TenantOperations>,
// Locking for node-mutating operations: take exclusively for operations that modify the node's persistent state, or
// that transition it to/from Active.
node_op_locks: IdLockMap<NodeId>,
node_op_locks: IdLockMap<NodeId, NodeOperations>,
// Limit how many Reconcilers we will spawn concurrently
reconciler_concurrency: Arc<tokio::sync::Semaphore>,
@@ -307,7 +328,7 @@ struct TenantShardSplitAbort {
new_shard_count: ShardCount,
new_stripe_size: Option<ShardStripeSize>,
/// Until this abort op is complete, no other operations may be done on the tenant
_tenant_lock: tokio::sync::OwnedRwLockWriteGuard<()>,
_tenant_lock: WrappedWriteGuard<TenantOperations>,
}
#[derive(thiserror::Error, Debug)]
@@ -1340,7 +1361,7 @@ impl Service {
async fn node_activate_reconcile(
&self,
mut node: Node,
_lock: &OwnedRwLockWriteGuard<()>,
_lock: &WrappedWriteGuard<NodeOperations>,
) -> Result<(), ApiError> {
// This Node is a mutable local copy: we will set it active so that we can use its
// API client to reconcile with the node. The Node in [`Self::nodes`] will get updated
@@ -1586,11 +1607,12 @@ impl Service {
let tenant_id = create_req.new_tenant_id.tenant_id;
// Exclude any concurrent attempts to create/access the same tenant ID
let _tenant_lock = self
.tenant_op_locks
.exclusive(create_req.new_tenant_id.tenant_id)
.await;
let _tenant_lock = trace_exclusive_lock(
&self.tenant_op_locks,
create_req.new_tenant_id.tenant_id,
TenantOperations::Create,
)
.await;
let (response, waiters) = self.do_tenant_create(create_req).await?;
if let Err(e) = self.await_waiters(waiters, RECONCILE_TIMEOUT).await {
@@ -1929,10 +1951,12 @@ impl Service {
req: TenantLocationConfigRequest,
) -> Result<TenantLocationConfigResponse, ApiError> {
// We require an exclusive lock, because we are updating both persistent and in-memory state
let _tenant_lock = self
.tenant_op_locks
.exclusive(tenant_shard_id.tenant_id)
.await;
let _tenant_lock = trace_exclusive_lock(
&self.tenant_op_locks,
tenant_shard_id.tenant_id,
TenantOperations::LocationConfig,
)
.await;
if !tenant_shard_id.is_unsharded() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
@@ -2050,7 +2074,12 @@ impl Service {
pub(crate) async fn tenant_config_set(&self, req: TenantConfigRequest) -> Result<(), ApiError> {
// We require an exclusive lock, because we are updating persistent and in-memory state
let _tenant_lock = self.tenant_op_locks.exclusive(req.tenant_id).await;
let _tenant_lock = trace_exclusive_lock(
&self.tenant_op_locks,
req.tenant_id,
TenantOperations::ConfigSet,
)
.await;
let tenant_id = req.tenant_id;
let config = req.config;
@@ -2139,7 +2168,12 @@ impl Service {
timestamp: Cow<'_, str>,
done_if_after: Cow<'_, str>,
) -> Result<(), ApiError> {
let _tenant_lock = self.tenant_op_locks.exclusive(tenant_id).await;
let _tenant_lock = trace_exclusive_lock(
&self.tenant_op_locks,
tenant_id,
TenantOperations::TimeTravelRemoteStorage,
)
.await;
let node = {
let locked = self.inner.read().unwrap();
@@ -2230,7 +2264,12 @@ impl Service {
tenant_id: TenantId,
wait: Option<Duration>,
) -> Result<(StatusCode, SecondaryProgress), ApiError> {
let _tenant_lock = self.tenant_op_locks.shared(tenant_id).await;
let _tenant_lock = trace_shared_lock(
&self.tenant_op_locks,
tenant_id,
TenantOperations::SecondaryDownload,
)
.await;
// Acquire lock and yield the collection of shard-node tuples which we will send requests onward to
let targets = {
@@ -2324,7 +2363,8 @@ impl Service {
}
pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result<StatusCode, ApiError> {
let _tenant_lock = self.tenant_op_locks.exclusive(tenant_id).await;
let _tenant_lock =
trace_exclusive_lock(&self.tenant_op_locks, tenant_id, TenantOperations::Delete).await;
self.ensure_attached_wait(tenant_id).await?;
@@ -2424,7 +2464,14 @@ impl Service {
req: TenantPolicyRequest,
) -> Result<(), ApiError> {
// We require an exclusive lock, because we are updating persistent and in-memory state
let _tenant_lock = self.tenant_op_locks.exclusive(tenant_id).await;
let _tenant_lock = trace_exclusive_lock(
&self.tenant_op_locks,
tenant_id,
TenantOperations::UpdatePolicy,
)
.await;
failpoint_support::sleep_millis_async!("tenant-update-policy-exclusive-lock");
let TenantPolicyRequest {
placement,
@@ -2478,7 +2525,12 @@ impl Service {
create_req.new_timeline_id,
);
let _tenant_lock = self.tenant_op_locks.shared(tenant_id).await;
let _tenant_lock = trace_shared_lock(
&self.tenant_op_locks,
tenant_id,
TenantOperations::TimelineCreate,
)
.await;
self.ensure_attached_wait(tenant_id).await?;
@@ -2593,7 +2645,12 @@ impl Service {
timeline_id: TimelineId,
) -> Result<StatusCode, ApiError> {
tracing::info!("Deleting timeline {}/{}", tenant_id, timeline_id,);
let _tenant_lock = self.tenant_op_locks.shared(tenant_id).await;
let _tenant_lock = trace_shared_lock(
&self.tenant_op_locks,
tenant_id,
TenantOperations::TimelineDelete,
)
.await;
self.ensure_attached_wait(tenant_id).await?;
@@ -3132,7 +3189,12 @@ impl Service {
) -> Result<TenantShardSplitResponse, ApiError> {
// TODO: return 503 if we get stuck waiting for this lock
// (issue https://github.com/neondatabase/neon/issues/7108)
let _tenant_lock = self.tenant_op_locks.exclusive(tenant_id).await;
let _tenant_lock = trace_exclusive_lock(
&self.tenant_op_locks,
tenant_id,
TenantOperations::ShardSplit,
)
.await;
let new_shard_count = ShardCount::new(split_req.new_shard_count);
let new_stripe_size = split_req.new_stripe_size;
@@ -3893,9 +3955,13 @@ impl Service {
&self,
register_req: NodeRegisterRequest,
) -> Result<(), ApiError> {
let _node_lock = self.node_op_locks.exclusive(register_req.node_id).await;
let _node_lock = trace_exclusive_lock(
&self.node_op_locks,
register_req.node_id,
NodeOperations::Register,
)
.await;
// Pre-check for an already-existing node
{
let locked = self.inner.read().unwrap();
if let Some(node) = locked.nodes.get(&register_req.node_id) {
@@ -3982,7 +4048,8 @@ impl Service {
availability: Option<NodeAvailability>,
scheduling: Option<NodeSchedulingPolicy>,
) -> Result<(), ApiError> {
let _node_lock = self.node_op_locks.exclusive(node_id).await;
let _node_lock =
trace_exclusive_lock(&self.node_op_locks, node_id, NodeOperations::Configure).await;
if let Some(scheduling) = scheduling {
// Scheduling is a persistent part of Node: we must write updates to the database before

View File

@@ -1959,6 +1959,55 @@ class Pagectl(AbstractNeonCli):
return IndexPartDump.from_json(parsed)
class LogUtils:
"""
A mixin class which provides utilities for inspecting the logs of a service.
"""
def __init__(self, logfile: Path) -> None:
self.logfile = logfile
def assert_log_contains(
self, pattern: str, offset: None | LogCursor = None
) -> Tuple[str, LogCursor]:
"""Convenient for use inside wait_until()"""
res = self.log_contains(pattern, offset=offset)
assert res is not None
return res
def log_contains(
self, pattern: str, offset: None | LogCursor = None
) -> Optional[Tuple[str, LogCursor]]:
"""Check that the log contains a line that matches the given regex"""
logfile = self.logfile
if not logfile.exists():
log.warning(f"Skipping log check: {logfile} does not exist")
return None
contains_re = re.compile(pattern)
# XXX: Our rust logging machinery buffers the messages, so if you
# call this function immediately after it's been logged, there is
# no guarantee it is already present in the log file. This hasn't
# been a problem in practice, our python tests are not fast enough
# to hit that race condition.
skip_until_line_no = 0 if offset is None else offset._line_no
cur_line_no = 0
with logfile.open("r") as f:
for line in f:
if cur_line_no < skip_until_line_no:
cur_line_no += 1
continue
elif contains_re.search(line):
# found it!
cur_line_no += 1
return (line, LogCursor(cur_line_no))
else:
cur_line_no += 1
return None
class StorageControllerApiException(Exception):
def __init__(self, message, status_code: int):
super().__init__(message)
@@ -1966,12 +2015,13 @@ class StorageControllerApiException(Exception):
self.status_code = status_code
class NeonStorageController(MetricsGetter):
class NeonStorageController(MetricsGetter, LogUtils):
def __init__(self, env: NeonEnv, auth_enabled: bool):
self.env = env
self.running = False
self.auth_enabled = auth_enabled
self.allowed_errors: list[str] = DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS
self.logfile = self.workdir / "storage_controller.log"
def start(self):
assert not self.running
@@ -2295,6 +2345,10 @@ class NeonStorageController(MetricsGetter):
log.info(f"Got failpoints request response code {res.status_code}")
res.raise_for_status()
@property
def workdir(self) -> Path:
return self.env.repo_dir
def __enter__(self) -> "NeonStorageController":
return self
@@ -2312,7 +2366,7 @@ class LogCursor:
_line_no: int
class NeonPageserver(PgProtocol):
class NeonPageserver(PgProtocol, LogUtils):
"""
An object representing a running pageserver.
"""
@@ -2329,7 +2383,7 @@ class NeonPageserver(PgProtocol):
self.service_port = port
self.config_override = config_override
self.version = env.get_binary_version("pageserver")
self.logfile = self.workdir / "pageserver.log"
# After a test finishes, we will scrape the log to see if there are any
# unexpected error messages. If your test expects an error, add it to
# 'allowed_errors' in the test with something like:
@@ -2469,46 +2523,6 @@ class NeonPageserver(PgProtocol):
value = self.http_client().get_metric_value(metric)
assert value == 0, f"Nonzero {metric} == {value}"
def assert_log_contains(
self, pattern: str, offset: None | LogCursor = None
) -> Tuple[str, LogCursor]:
"""Convenient for use inside wait_until()"""
res = self.log_contains(pattern, offset=offset)
assert res is not None
return res
def log_contains(
self, pattern: str, offset: None | LogCursor = None
) -> Optional[Tuple[str, LogCursor]]:
"""Check that the pageserver log contains a line that matches the given regex"""
logfile = self.workdir / "pageserver.log"
if not logfile.exists():
log.warning(f"Skipping log check: {logfile} does not exist")
return None
contains_re = re.compile(pattern)
# XXX: Our rust logging machinery buffers the messages, so if you
# call this function immediately after it's been logged, there is
# no guarantee it is already present in the log file. This hasn't
# been a problem in practice, our python tests are not fast enough
# to hit that race condition.
skip_until_line_no = 0 if offset is None else offset._line_no
cur_line_no = 0
with logfile.open("r") as f:
for line in f:
if cur_line_no < skip_until_line_no:
cur_line_no += 1
continue
elif contains_re.search(line):
# found it!
cur_line_no += 1
return (line, LogCursor(cur_line_no))
else:
cur_line_no += 1
return None
def tenant_attach(
self,
tenant_id: TenantId,

View File

@@ -1,4 +1,5 @@
import json
import threading
import time
from collections import defaultdict
from datetime import datetime, timezone
@@ -1259,6 +1260,53 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
env.storage_controller.consistency_check()
def test_lock_time_tracing(neon_env_builder: NeonEnvBuilder):
"""
Check that when lock on resource (tenants, nodes) is held for too long it is
traced in logs.
"""
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
env.storage_controller.allowed_errors.extend(
[
".*Lock on.*",
".*Scheduling is disabled by policy.*",
f".*Operation TimelineCreate on key {tenant_id} has waited.*",
]
)
# Apply failpoint
env.storage_controller.configure_failpoints(
("tenant-update-policy-exclusive-lock", "return(31000)")
)
# This will hold the exclusive for enough time to cause an warning
def update_tenent_policy():
env.storage_controller.tenant_policy_update(
tenant_id=tenant_id,
body={
"scheduling": "Stop",
},
)
thread_update_tenant_policy = threading.Thread(target=update_tenent_policy)
thread_update_tenant_policy.start()
# Make sure the update policy thread has started
time.sleep(1)
# This will not be able to access and will log a warning
timeline_id = TimelineId.generate()
env.storage_controller.pageserver_api().timeline_create(
pg_version=PgVersion.NOT_SET, tenant_id=tenant_id, new_timeline_id=timeline_id
)
thread_update_tenant_policy.join(timeout=10)
env.storage_controller.assert_log_contains("Lock on UpdatePolicy was held for")
env.storage_controller.assert_log_contains(
f"Operation TimelineCreate on key {tenant_id} has waited"
)
@pytest.mark.parametrize("remote_storage", [RemoteStorageKind.LOCAL_FS, s3_storage()])
@pytest.mark.parametrize("shard_count", [None, 4])
def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_storage):