Compare commits

...

5 Commits

Author SHA1 Message Date
Joonas Koivunen
0678febff8 fix: close semaphore on stop if not already closed 2024-02-05 15:17:39 +02:00
Joonas Koivunen
7c37fad092 nag if shutdown is taking longer than 1s 2024-02-05 15:17:39 +02:00
Joonas Koivunen
c3c9889985 chore: add time at shutdown to Shutdown op
so we can nag if it's longer 1s.
2024-02-05 15:17:39 +02:00
Joonas Koivunen
fc88328c05 refactor: looks like a deadlock
however it's not because the acquire_owned will not capture the
environment. better to still grab the semaphore, then execute the
acquire.
2024-02-05 10:51:18 +02:00
Joonas Koivunen
70f646ffe2 More logging fixes (#6584)
I was on-call this week, these would had made me understand more/faster
of the system:
- move stray attaching start logging inside the span it starts, add
generation
- log ancestor timeline_id or bootstrapping in the beginning of timeline
creation
2024-02-05 09:34:03 +02:00
7 changed files with 75 additions and 48 deletions

View File

@@ -489,6 +489,12 @@ async fn timeline_create_handler(
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
if let Some(ancestor_id) = request_data.ancestor_timeline_id.as_ref() {
tracing::info!(%ancestor_id, "starting to branch");
} else {
tracing::info!("bootstrapping");
}
match tenant.create_timeline(
new_timeline_id,
request_data.ancestor_timeline_id.map(TimelineId::from),

View File

@@ -205,7 +205,7 @@ impl AttachedTenantConf {
match &location_conf.mode {
LocationMode::Attached(attach_conf) => Ok(Self {
tenant_conf: location_conf.tenant_conf,
location: attach_conf.clone(),
location: *attach_conf,
}),
LocationMode::Secondary(_) => {
anyhow::bail!("Attempted to construct AttachedTenantConf from a LocationConf in secondary mode")
@@ -625,6 +625,9 @@ impl Tenant {
deletion_queue_client,
} = resources;
let attach_mode = attached_conf.location.attach_mode;
let generation = attached_conf.location.generation;
let tenant = Arc::new(Tenant::new(
TenantState::Attaching,
conf,
@@ -654,6 +657,12 @@ impl Tenant {
"attach tenant",
false,
async move {
info!(
?attach_mode,
"Attaching tenant"
);
let _gate_guard = attach_gate_guard;
// Is this tenant being spawned as part of process startup?
@@ -865,7 +874,7 @@ impl Tenant {
Ok(())
}
.instrument({
let span = tracing::info_span!(parent: None, "attach", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug());
let span = tracing::info_span!(parent: None, "attach", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), gen=?generation);
span.follows_from(Span::current());
span
}),
@@ -2354,12 +2363,7 @@ impl Tenant {
}
pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
self.tenant_conf
.read()
.unwrap()
.location
.attach_mode
.clone()
self.tenant_conf.read().unwrap().location.attach_mode
}
/// For API access: generate a LocationConfig equivalent to the one that would be used to
@@ -3225,8 +3229,6 @@ impl Tenant {
.context("branch initial metadata upload")?;
}
info!("branched timeline {dst_id} from {src_id} at {start_lsn}");
Ok(new_timeline)
}
@@ -3444,12 +3446,6 @@ impl Tenant {
// All done!
let timeline = raw_timeline.finish_creation()?;
info!(
"created root timeline {} timeline.lsn {}",
timeline_id,
timeline.get_last_record_lsn()
);
Ok(timeline)
}

View File

@@ -51,7 +51,7 @@ pub mod defaults {
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) enum AttachmentMode {
/// Our generation is current as far as we know, and as far as we know we are the only attached
/// pageserver. This is the "normal" attachment mode.
@@ -66,7 +66,7 @@ pub(crate) enum AttachmentMode {
Stale,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct AttachedLocationConfig {
pub(crate) generation: Generation,
pub(crate) attach_mode: AttachmentMode,

View File

@@ -607,13 +607,6 @@ pub(crate) fn tenant_spawn(
"Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
);
info!(
tenant_id = %tenant_shard_id.tenant_id,
shard_id = %tenant_shard_id.shard_slug(),
generation = ?location_conf.location.generation,
attach_mode = ?location_conf.location.attach_mode,
"Attaching tenant"
);
let tenant = match Tenant::spawn(
conf,
tenant_shard_id,

View File

@@ -942,7 +942,7 @@ impl RemoteTimelineClient {
tracing::error!("RemoteTimelineClient::shutdown was cancelled; this should not happen, do not make this into an allowed_error")
});
let fut = {
let sem = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = match &mut *guard {
UploadQueue::Stopped(_) => return Ok(()),
@@ -958,25 +958,32 @@ impl RemoteTimelineClient {
// made cancellable.
if !upload_queue.shutting_down {
upload_queue.shutting_down = true;
upload_queue.queued_operations.push_back(UploadOp::Shutdown);
upload_queue
.queued_operations
.push_back(UploadOp::Shutdown {
since: std::time::Instant::now(),
});
// this operation is not counted similar to Barrier
self.launch_queued_tasks(upload_queue);
}
upload_queue.shutdown_ready.clone().acquire_owned()
upload_queue.shutdown_ready.clone()
};
let res = fut.await;
let mut closed = std::pin::pin!(sem.acquire());
let res = tokio::select! {
res = &mut closed => res,
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
tracing::warn!("still waiting for UploadQueueInitialized to shutdown");
closed.await
}
};
scopeguard::ScopeGuard::into_inner(sg);
match res {
Ok(_permit) => unreachable!("shutdown_ready should not have been added permits"),
Err(_closed) => {
// expected
}
}
res.expect_err("shutdown_ready should not have been added permits, only closed");
self.stop()
}
@@ -1249,8 +1256,7 @@ impl RemoteTimelineClient {
// Wait for preceding uploads to finish. Concurrent deletions are OK, though.
upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
}
UploadOp::Barrier(_) | UploadOp::Shutdown => {
UploadOp::Barrier(_) | UploadOp::Shutdown { .. } => {
upload_queue.inprogress_tasks.is_empty()
}
};
@@ -1265,10 +1271,11 @@ impl RemoteTimelineClient {
break;
}
if let UploadOp::Shutdown = next_op {
if let UploadOp::Shutdown { since } = next_op {
// leave the op in the queue but do not start more tasks; it will be dropped when
// the stop is called.
upload_queue.shutdown_ready.close();
assert!(upload_queue.shutting_down);
Self::communicate_shutdown(&upload_queue.shutdown_ready, since);
break;
}
@@ -1292,7 +1299,9 @@ impl RemoteTimelineClient {
sender.send_replace(());
continue;
}
UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"),
UploadOp::Shutdown { .. } => {
unreachable!("shutdown is intentionally never popped off")
}
};
// Assign unique ID to this task
@@ -1331,7 +1340,24 @@ impl RemoteTimelineClient {
}
}
///
fn communicate_shutdown(
shutdown_ready: &tokio::sync::Semaphore,
shutting_down_since: &std::time::Instant,
) {
if shutdown_ready.is_closed() {
return;
}
// there cannot be any races because the semaphore is from &mut UploadQueueInitialized
shutdown_ready.close();
let elapsed = shutting_down_since.elapsed();
if elapsed > std::time::Duration::from_secs(1) {
tracing::warn!(
elapsed_ms = elapsed.as_millis(),
"it took longer than expected to shutdown RemoteTimelineClient"
);
}
}
/// Perform an upload task.
///
/// The task is in the `inprogress_tasks` list. This function will try to
@@ -1341,7 +1367,6 @@ impl RemoteTimelineClient {
///
/// The task can be shut down, however. That leads to stopping the whole
/// queue.
///
async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
// Loop to retry until it completes.
loop {
@@ -1428,7 +1453,7 @@ impl RemoteTimelineClient {
.await
.map_err(|e| anyhow::anyhow!(e))
}
unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown { .. } => {
// unreachable. Barrier operations are handled synchronously in
// launch_queued_tasks
warn!("unexpected {unexpected:?} operation in perform_upload_task");
@@ -1525,7 +1550,7 @@ impl RemoteTimelineClient {
upload_queue.num_inprogress_deletions -= 1;
None
}
UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
UploadOp::Barrier(..) | UploadOp::Shutdown { .. } => unreachable!(),
};
// Launch any queued tasks that were unblocked by this one.
@@ -1580,7 +1605,7 @@ impl RemoteTimelineClient {
reason: "should we track deletes? positive or negative sign?",
},
),
UploadOp::Barrier(..) | UploadOp::Shutdown => {
UploadOp::Barrier(..) | UploadOp::Shutdown { .. } => {
// we do not account these
return None;
}
@@ -1684,6 +1709,13 @@ impl RemoteTimelineClient {
// Tear down queued ops
for op in qi.queued_operations.into_iter() {
self.calls_unfinished_metric_end(&op);
if let UploadOp::Shutdown { since } = &op {
// just in case we have a waiter on the RemoteTimelineClient::shutdown
assert!(qi.shutting_down);
Self::communicate_shutdown(&qi.shutdown_ready, since);
}
// Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
// which is exactly what we want to happen.
drop(op);

View File

@@ -292,7 +292,7 @@ pub(crate) enum UploadOp {
/// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
/// this is the same as a Barrier.
Shutdown,
Shutdown { since: std::time::Instant },
}
impl std::fmt::Display for UploadOp {
@@ -314,7 +314,7 @@ impl std::fmt::Display for UploadOp {
write!(f, "Delete({} layers)", delete.layers.len())
}
UploadOp::Barrier(_) => write!(f, "Barrier"),
UploadOp::Shutdown => write!(f, "Shutdown"),
UploadOp::Shutdown { since } => write!(f, "Shutdown(since: {:?})", since.elapsed()),
}
}
}

View File

@@ -883,7 +883,7 @@ def test_ondemand_activation(neon_env_builder: NeonEnvBuilder):
# Deletion itself won't complete due to our failpoint: Tenant::shutdown can't complete while calculating
# logical size is paused in a failpoint. So instead we will use a log observation to check that
# on-demand activation was triggered by the tenant deletion
log_match = f".*attach{{tenant_id={delete_tenant_id} shard_id=0000}}: Activating tenant \\(on-demand\\).*"
log_match = f".*attach{{tenant_id={delete_tenant_id} shard_id=0000 gen=[0-9a-f]+}}: Activating tenant \\(on-demand\\).*"
def activated_on_demand():
assert env.pageserver.log_contains(log_match) is not None