mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-23 16:10:37 +00:00
Compare commits
5 Commits
sk/aux_fil
...
joonas/pos
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0678febff8 | ||
|
|
7c37fad092 | ||
|
|
c3c9889985 | ||
|
|
fc88328c05 | ||
|
|
70f646ffe2 |
@@ -489,6 +489,12 @@ async fn timeline_create_handler(
|
||||
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
|
||||
if let Some(ancestor_id) = request_data.ancestor_timeline_id.as_ref() {
|
||||
tracing::info!(%ancestor_id, "starting to branch");
|
||||
} else {
|
||||
tracing::info!("bootstrapping");
|
||||
}
|
||||
|
||||
match tenant.create_timeline(
|
||||
new_timeline_id,
|
||||
request_data.ancestor_timeline_id.map(TimelineId::from),
|
||||
|
||||
@@ -205,7 +205,7 @@ impl AttachedTenantConf {
|
||||
match &location_conf.mode {
|
||||
LocationMode::Attached(attach_conf) => Ok(Self {
|
||||
tenant_conf: location_conf.tenant_conf,
|
||||
location: attach_conf.clone(),
|
||||
location: *attach_conf,
|
||||
}),
|
||||
LocationMode::Secondary(_) => {
|
||||
anyhow::bail!("Attempted to construct AttachedTenantConf from a LocationConf in secondary mode")
|
||||
@@ -625,6 +625,9 @@ impl Tenant {
|
||||
deletion_queue_client,
|
||||
} = resources;
|
||||
|
||||
let attach_mode = attached_conf.location.attach_mode;
|
||||
let generation = attached_conf.location.generation;
|
||||
|
||||
let tenant = Arc::new(Tenant::new(
|
||||
TenantState::Attaching,
|
||||
conf,
|
||||
@@ -654,6 +657,12 @@ impl Tenant {
|
||||
"attach tenant",
|
||||
false,
|
||||
async move {
|
||||
|
||||
info!(
|
||||
?attach_mode,
|
||||
"Attaching tenant"
|
||||
);
|
||||
|
||||
let _gate_guard = attach_gate_guard;
|
||||
|
||||
// Is this tenant being spawned as part of process startup?
|
||||
@@ -865,7 +874,7 @@ impl Tenant {
|
||||
Ok(())
|
||||
}
|
||||
.instrument({
|
||||
let span = tracing::info_span!(parent: None, "attach", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug());
|
||||
let span = tracing::info_span!(parent: None, "attach", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), gen=?generation);
|
||||
span.follows_from(Span::current());
|
||||
span
|
||||
}),
|
||||
@@ -2354,12 +2363,7 @@ impl Tenant {
|
||||
}
|
||||
|
||||
pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
|
||||
self.tenant_conf
|
||||
.read()
|
||||
.unwrap()
|
||||
.location
|
||||
.attach_mode
|
||||
.clone()
|
||||
self.tenant_conf.read().unwrap().location.attach_mode
|
||||
}
|
||||
|
||||
/// For API access: generate a LocationConfig equivalent to the one that would be used to
|
||||
@@ -3225,8 +3229,6 @@ impl Tenant {
|
||||
.context("branch initial metadata upload")?;
|
||||
}
|
||||
|
||||
info!("branched timeline {dst_id} from {src_id} at {start_lsn}");
|
||||
|
||||
Ok(new_timeline)
|
||||
}
|
||||
|
||||
@@ -3444,12 +3446,6 @@ impl Tenant {
|
||||
// All done!
|
||||
let timeline = raw_timeline.finish_creation()?;
|
||||
|
||||
info!(
|
||||
"created root timeline {} timeline.lsn {}",
|
||||
timeline_id,
|
||||
timeline.get_last_record_lsn()
|
||||
);
|
||||
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
|
||||
@@ -51,7 +51,7 @@ pub mod defaults {
|
||||
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub(crate) enum AttachmentMode {
|
||||
/// Our generation is current as far as we know, and as far as we know we are the only attached
|
||||
/// pageserver. This is the "normal" attachment mode.
|
||||
@@ -66,7 +66,7 @@ pub(crate) enum AttachmentMode {
|
||||
Stale,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub(crate) struct AttachedLocationConfig {
|
||||
pub(crate) generation: Generation,
|
||||
pub(crate) attach_mode: AttachmentMode,
|
||||
|
||||
@@ -607,13 +607,6 @@ pub(crate) fn tenant_spawn(
|
||||
"Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
|
||||
);
|
||||
|
||||
info!(
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
shard_id = %tenant_shard_id.shard_slug(),
|
||||
generation = ?location_conf.location.generation,
|
||||
attach_mode = ?location_conf.location.attach_mode,
|
||||
"Attaching tenant"
|
||||
);
|
||||
let tenant = match Tenant::spawn(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
|
||||
@@ -942,7 +942,7 @@ impl RemoteTimelineClient {
|
||||
tracing::error!("RemoteTimelineClient::shutdown was cancelled; this should not happen, do not make this into an allowed_error")
|
||||
});
|
||||
|
||||
let fut = {
|
||||
let sem = {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = match &mut *guard {
|
||||
UploadQueue::Stopped(_) => return Ok(()),
|
||||
@@ -958,25 +958,32 @@ impl RemoteTimelineClient {
|
||||
// made cancellable.
|
||||
if !upload_queue.shutting_down {
|
||||
upload_queue.shutting_down = true;
|
||||
upload_queue.queued_operations.push_back(UploadOp::Shutdown);
|
||||
upload_queue
|
||||
.queued_operations
|
||||
.push_back(UploadOp::Shutdown {
|
||||
since: std::time::Instant::now(),
|
||||
});
|
||||
// this operation is not counted similar to Barrier
|
||||
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
}
|
||||
|
||||
upload_queue.shutdown_ready.clone().acquire_owned()
|
||||
upload_queue.shutdown_ready.clone()
|
||||
};
|
||||
|
||||
let res = fut.await;
|
||||
let mut closed = std::pin::pin!(sem.acquire());
|
||||
|
||||
let res = tokio::select! {
|
||||
res = &mut closed => res,
|
||||
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
|
||||
tracing::warn!("still waiting for UploadQueueInitialized to shutdown");
|
||||
closed.await
|
||||
}
|
||||
};
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(sg);
|
||||
|
||||
match res {
|
||||
Ok(_permit) => unreachable!("shutdown_ready should not have been added permits"),
|
||||
Err(_closed) => {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
res.expect_err("shutdown_ready should not have been added permits, only closed");
|
||||
|
||||
self.stop()
|
||||
}
|
||||
@@ -1249,8 +1256,7 @@ impl RemoteTimelineClient {
|
||||
// Wait for preceding uploads to finish. Concurrent deletions are OK, though.
|
||||
upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
|
||||
}
|
||||
|
||||
UploadOp::Barrier(_) | UploadOp::Shutdown => {
|
||||
UploadOp::Barrier(_) | UploadOp::Shutdown { .. } => {
|
||||
upload_queue.inprogress_tasks.is_empty()
|
||||
}
|
||||
};
|
||||
@@ -1265,10 +1271,11 @@ impl RemoteTimelineClient {
|
||||
break;
|
||||
}
|
||||
|
||||
if let UploadOp::Shutdown = next_op {
|
||||
if let UploadOp::Shutdown { since } = next_op {
|
||||
// leave the op in the queue but do not start more tasks; it will be dropped when
|
||||
// the stop is called.
|
||||
upload_queue.shutdown_ready.close();
|
||||
assert!(upload_queue.shutting_down);
|
||||
Self::communicate_shutdown(&upload_queue.shutdown_ready, since);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -1292,7 +1299,9 @@ impl RemoteTimelineClient {
|
||||
sender.send_replace(());
|
||||
continue;
|
||||
}
|
||||
UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"),
|
||||
UploadOp::Shutdown { .. } => {
|
||||
unreachable!("shutdown is intentionally never popped off")
|
||||
}
|
||||
};
|
||||
|
||||
// Assign unique ID to this task
|
||||
@@ -1331,7 +1340,24 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
fn communicate_shutdown(
|
||||
shutdown_ready: &tokio::sync::Semaphore,
|
||||
shutting_down_since: &std::time::Instant,
|
||||
) {
|
||||
if shutdown_ready.is_closed() {
|
||||
return;
|
||||
}
|
||||
// there cannot be any races because the semaphore is from &mut UploadQueueInitialized
|
||||
shutdown_ready.close();
|
||||
let elapsed = shutting_down_since.elapsed();
|
||||
if elapsed > std::time::Duration::from_secs(1) {
|
||||
tracing::warn!(
|
||||
elapsed_ms = elapsed.as_millis(),
|
||||
"it took longer than expected to shutdown RemoteTimelineClient"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Perform an upload task.
|
||||
///
|
||||
/// The task is in the `inprogress_tasks` list. This function will try to
|
||||
@@ -1341,7 +1367,6 @@ impl RemoteTimelineClient {
|
||||
///
|
||||
/// The task can be shut down, however. That leads to stopping the whole
|
||||
/// queue.
|
||||
///
|
||||
async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
|
||||
// Loop to retry until it completes.
|
||||
loop {
|
||||
@@ -1428,7 +1453,7 @@ impl RemoteTimelineClient {
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e))
|
||||
}
|
||||
unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
|
||||
unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown { .. } => {
|
||||
// unreachable. Barrier operations are handled synchronously in
|
||||
// launch_queued_tasks
|
||||
warn!("unexpected {unexpected:?} operation in perform_upload_task");
|
||||
@@ -1525,7 +1550,7 @@ impl RemoteTimelineClient {
|
||||
upload_queue.num_inprogress_deletions -= 1;
|
||||
None
|
||||
}
|
||||
UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
|
||||
UploadOp::Barrier(..) | UploadOp::Shutdown { .. } => unreachable!(),
|
||||
};
|
||||
|
||||
// Launch any queued tasks that were unblocked by this one.
|
||||
@@ -1580,7 +1605,7 @@ impl RemoteTimelineClient {
|
||||
reason: "should we track deletes? positive or negative sign?",
|
||||
},
|
||||
),
|
||||
UploadOp::Barrier(..) | UploadOp::Shutdown => {
|
||||
UploadOp::Barrier(..) | UploadOp::Shutdown { .. } => {
|
||||
// we do not account these
|
||||
return None;
|
||||
}
|
||||
@@ -1684,6 +1709,13 @@ impl RemoteTimelineClient {
|
||||
// Tear down queued ops
|
||||
for op in qi.queued_operations.into_iter() {
|
||||
self.calls_unfinished_metric_end(&op);
|
||||
|
||||
if let UploadOp::Shutdown { since } = &op {
|
||||
// just in case we have a waiter on the RemoteTimelineClient::shutdown
|
||||
assert!(qi.shutting_down);
|
||||
Self::communicate_shutdown(&qi.shutdown_ready, since);
|
||||
}
|
||||
|
||||
// Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
|
||||
// which is exactly what we want to happen.
|
||||
drop(op);
|
||||
|
||||
@@ -292,7 +292,7 @@ pub(crate) enum UploadOp {
|
||||
|
||||
/// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
|
||||
/// this is the same as a Barrier.
|
||||
Shutdown,
|
||||
Shutdown { since: std::time::Instant },
|
||||
}
|
||||
|
||||
impl std::fmt::Display for UploadOp {
|
||||
@@ -314,7 +314,7 @@ impl std::fmt::Display for UploadOp {
|
||||
write!(f, "Delete({} layers)", delete.layers.len())
|
||||
}
|
||||
UploadOp::Barrier(_) => write!(f, "Barrier"),
|
||||
UploadOp::Shutdown => write!(f, "Shutdown"),
|
||||
UploadOp::Shutdown { since } => write!(f, "Shutdown(since: {:?})", since.elapsed()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -883,7 +883,7 @@ def test_ondemand_activation(neon_env_builder: NeonEnvBuilder):
|
||||
# Deletion itself won't complete due to our failpoint: Tenant::shutdown can't complete while calculating
|
||||
# logical size is paused in a failpoint. So instead we will use a log observation to check that
|
||||
# on-demand activation was triggered by the tenant deletion
|
||||
log_match = f".*attach{{tenant_id={delete_tenant_id} shard_id=0000}}: Activating tenant \\(on-demand\\).*"
|
||||
log_match = f".*attach{{tenant_id={delete_tenant_id} shard_id=0000 gen=[0-9a-f]+}}: Activating tenant \\(on-demand\\).*"
|
||||
|
||||
def activated_on_demand():
|
||||
assert env.pageserver.log_contains(log_match) is not None
|
||||
|
||||
Reference in New Issue
Block a user