Merge branch 'main' into basebackup-import

This commit is contained in:
Bojan Serafimov
2022-06-14 17:05:52 -04:00
41 changed files with 935 additions and 527 deletions

View File

@@ -104,7 +104,7 @@ fn main() -> anyhow::Result<()> {
return Ok(());
}
let workdir = Path::new(arg_matches.value_of("workdir").unwrap_or(".zenith"));
let workdir = Path::new(arg_matches.value_of("workdir").unwrap_or(".neon"));
let workdir = workdir
.canonicalize()
.with_context(|| format!("Error opening workdir '{}'", workdir.display()))?;

View File

@@ -4,7 +4,7 @@
//! The functions here are responsible for locating the correct layer for the
//! get/put call, tracing timeline branching history as needed.
//!
//! The files are stored in the .zenith/tenants/<tenantid>/timelines/<timelineid>
//! The files are stored in the .neon/tenants/<tenantid>/timelines/<timelineid>
//! directory. See layered_repository/README for how the files are managed.
//! In addition to the layer files, there is a metadata file in the same
//! directory that contains information about the timeline, in particular its
@@ -148,7 +148,7 @@ lazy_static! {
.expect("failed to define a metric");
}
/// Parts of the `.zenith/tenants/<tenantid>/timelines/<timelineid>` directory prefix.
/// Parts of the `.neon/tenants/<tenantid>/timelines/<timelineid>` directory prefix.
pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
///

View File

@@ -123,7 +123,7 @@ The files are called "layer files". Each layer file covers a range of keys, and
a range of LSNs (or a single LSN, in case of image layers). You can think of it
as a rectangle in the two-dimensional key-LSN space. The layer files for each
timeline are stored in the timeline's subdirectory under
`.zenith/tenants/<tenantid>/timelines`.
`.neon/tenants/<tenantid>/timelines`.
There are two kind of layer files: images, and delta layers. An image file
contains a snapshot of all keys at a particular LSN, whereas a delta file
@@ -178,7 +178,7 @@ version, and how branching and GC works is still valid.
The full path of a delta file looks like this:
```
.zenith/tenants/941ddc8604413b88b3d208bddf90396c/timelines/4af489b06af8eed9e27a841775616962/rel_1663_13990_2609_0_10_000000000169C348_0000000001702000
.neon/tenants/941ddc8604413b88b3d208bddf90396c/timelines/4af489b06af8eed9e27a841775616962/rel_1663_13990_2609_0_10_000000000169C348_0000000001702000
```
For simplicity, the examples below use a simplified notation for the

View File

@@ -197,7 +197,7 @@ impl Display for TimelineSyncStatusUpdate {
}
///
/// A repository corresponds to one .zenith directory. One repository holds multiple
/// A repository corresponds to one .neon directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
pub trait Repository: Send + Sync {
type Timeline: Timeline;

View File

@@ -186,8 +186,8 @@ use crate::{
};
use metrics::{
register_histogram_vec, register_int_counter, register_int_gauge, HistogramVec, IntCounter,
IntGauge,
register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge,
HistogramVec, IntCounter, IntCounterVec, IntGauge,
};
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
@@ -208,14 +208,17 @@ lazy_static! {
static ref IMAGE_SYNC_TIME: HistogramVec = register_histogram_vec!(
"pageserver_remote_storage_image_sync_seconds",
"Time took to synchronize (download or upload) a whole pageserver image. \
Grouped by `operation_kind` (upload|download) and `status` (success|failure)",
&["operation_kind", "status"],
vec![
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 4.0, 5.0, 6.0, 7.0,
8.0, 9.0, 10.0, 12.5, 15.0, 17.5, 20.0
]
Grouped by tenant and timeline ids, `operation_kind` (upload|download) and `status` (success|failure)",
&["tenant_id", "timeline_id", "operation_kind", "status"],
vec![0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 3.0, 10.0, 20.0]
)
.expect("failed to register pageserver image sync time histogram vec");
static ref REMOTE_INDEX_UPLOAD: IntCounterVec = register_int_counter_vec!(
"pageserver_remote_storage_remote_index_uploads_total",
"Number of remote index uploads",
&["tenant_id", "timeline_id"],
)
.expect("failed to register pageserver remote index upload vec");
}
static SYNC_QUEUE: OnceCell<SyncQueue> = OnceCell::new();
@@ -1146,19 +1149,19 @@ where
.await
{
DownloadedTimeline::Abort => {
register_sync_status(sync_start, task_name, None);
register_sync_status(sync_id, sync_start, task_name, None);
if let Err(e) = index.write().await.set_awaits_download(&sync_id, false) {
error!("Timeline {sync_id} was expected to be in the remote index after a download attempt, but it's absent: {e:?}");
}
}
DownloadedTimeline::FailedAndRescheduled => {
register_sync_status(sync_start, task_name, Some(false));
register_sync_status(sync_id, sync_start, task_name, Some(false));
}
DownloadedTimeline::Successful(mut download_data) => {
match update_local_metadata(conf, sync_id, current_remote_timeline).await {
Ok(()) => match index.write().await.set_awaits_download(&sync_id, false) {
Ok(()) => {
register_sync_status(sync_start, task_name, Some(true));
register_sync_status(sync_id, sync_start, task_name, Some(true));
return Some(TimelineSyncStatusUpdate::Downloaded);
}
Err(e) => {
@@ -1169,7 +1172,7 @@ where
error!("Failed to update local timeline metadata: {e:?}");
download_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Download(download_data));
register_sync_status(sync_start, task_name, Some(false));
register_sync_status(sync_id, sync_start, task_name, Some(false));
}
}
}
@@ -1265,14 +1268,14 @@ async fn delete_timeline_data<P, S>(
error!("Failed to update remote timeline {sync_id}: {e:?}");
new_delete_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Delete(new_delete_data));
register_sync_status(sync_start, task_name, Some(false));
register_sync_status(sync_id, sync_start, task_name, Some(false));
return;
}
}
timeline_delete.deletion_registered = true;
let sync_status = delete_timeline_layers(storage, sync_queue, sync_id, new_delete_data).await;
register_sync_status(sync_start, task_name, Some(sync_status));
register_sync_status(sync_id, sync_start, task_name, Some(sync_status));
}
async fn read_metadata_file(metadata_path: &Path) -> anyhow::Result<TimelineMetadata> {
@@ -1306,7 +1309,7 @@ async fn upload_timeline_data<P, S>(
.await
{
UploadedTimeline::FailedAndRescheduled => {
register_sync_status(sync_start, task_name, Some(false));
register_sync_status(sync_id, sync_start, task_name, Some(false));
return;
}
UploadedTimeline::Successful(upload_data) => upload_data,
@@ -1325,13 +1328,13 @@ async fn upload_timeline_data<P, S>(
.await
{
Ok(()) => {
register_sync_status(sync_start, task_name, Some(true));
register_sync_status(sync_id, sync_start, task_name, Some(true));
}
Err(e) => {
error!("Failed to update remote timeline {sync_id}: {e:?}");
uploaded_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Upload(uploaded_data));
register_sync_status(sync_start, task_name, Some(false));
register_sync_status(sync_id, sync_start, task_name, Some(false));
}
}
}
@@ -1421,7 +1424,14 @@ where
IndexPart::from_remote_timeline(&timeline_path, updated_remote_timeline)
.context("Failed to create an index part from the updated remote timeline")?;
info!("Uploading remote index for the timeline");
debug!("Uploading remote index for the timeline");
REMOTE_INDEX_UPLOAD
.with_label_values(&[
&sync_id.tenant_id.to_string(),
&sync_id.timeline_id.to_string(),
])
.inc();
upload_index_part(conf, storage, sync_id, new_index_part)
.await
.context("Failed to upload new index part")
@@ -1590,12 +1600,24 @@ fn compare_local_and_remote_timeline(
(initial_timeline_status, awaits_download)
}
fn register_sync_status(sync_start: Instant, sync_name: &str, sync_status: Option<bool>) {
fn register_sync_status(
sync_id: ZTenantTimelineId,
sync_start: Instant,
sync_name: &str,
sync_status: Option<bool>,
) {
let secs_elapsed = sync_start.elapsed().as_secs_f64();
info!("Processed a sync task in {secs_elapsed:.2} seconds");
debug!("Processed a sync task in {secs_elapsed:.2} seconds");
let tenant_id = sync_id.tenant_id.to_string();
let timeline_id = sync_id.timeline_id.to_string();
match sync_status {
Some(true) => IMAGE_SYNC_TIME.with_label_values(&[sync_name, "success"]),
Some(false) => IMAGE_SYNC_TIME.with_label_values(&[sync_name, "failure"]),
Some(true) => {
IMAGE_SYNC_TIME.with_label_values(&[&tenant_id, &timeline_id, sync_name, "success"])
}
Some(false) => {
IMAGE_SYNC_TIME.with_label_values(&[&tenant_id, &timeline_id, sync_name, "failure"])
}
None => return,
}
.observe(secs_elapsed)

View File

@@ -4,6 +4,7 @@ use std::{fmt::Debug, path::PathBuf};
use anyhow::Context;
use futures::stream::{FuturesUnordered, StreamExt};
use lazy_static::lazy_static;
use remote_storage::RemoteStorage;
use tokio::fs;
use tracing::{debug, error, info, warn};
@@ -17,6 +18,16 @@ use super::{
use crate::{
config::PageServerConf, layered_repository::metadata::metadata_path, storage_sync::SyncTask,
};
use metrics::{register_int_counter_vec, IntCounterVec};
lazy_static! {
static ref NO_LAYERS_UPLOAD: IntCounterVec = register_int_counter_vec!(
"pageserver_remote_storage_no_layers_uploads_total",
"Number of skipped uploads due to no layers",
&["tenant_id", "timeline_id"],
)
.expect("failed to register pageserver no layers upload vec");
}
/// Serializes and uploads the given index part data to the remote storage.
pub(super) async fn upload_index_part<P, S>(
@@ -102,7 +113,13 @@ where
.collect::<Vec<_>>();
if layers_to_upload.is_empty() {
info!("No layers to upload after filtering, aborting");
debug!("No layers to upload after filtering, aborting");
NO_LAYERS_UPLOAD
.with_label_values(&[
&sync_id.tenant_id.to_string(),
&sync_id.timeline_id.to_string(),
])
.inc();
return UploadedTimeline::Successful(upload_data);
}

View File

@@ -50,7 +50,10 @@ use crate::thread_mgr::ThreadKind;
use crate::{thread_mgr, DatadirTimelineImpl};
use anyhow::{ensure, Context};
use chrono::{NaiveDateTime, Utc};
use etcd_broker::{Client, SkTimelineInfo, SkTimelineSubscription, SkTimelineSubscriptionKind};
use etcd_broker::{
subscription_key::SubscriptionKey, subscription_value::SkTimelineInfo, BrokerSubscription,
Client,
};
use itertools::Itertools;
use once_cell::sync::Lazy;
use std::cell::Cell;
@@ -68,7 +71,7 @@ use tokio::{
use tracing::*;
use url::Url;
use utils::lsn::Lsn;
use utils::pq_proto::ZenithFeedback;
use utils::pq_proto::ReplicationFeedback;
use utils::zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId};
use self::connection_handler::{WalConnectionEvent, WalReceiverConnection};
@@ -403,7 +406,7 @@ async fn timeline_wal_broker_loop_step(
// Endlessly try to subscribe for broker updates for a given timeline.
// If there are no safekeepers to maintain the lease, the timeline subscription will be inavailable in the broker and the operation will fail constantly.
// This is ok, pageservers should anyway try subscribing (with some backoff) since it's the only way they can get the timeline WAL anyway.
let mut broker_subscription: SkTimelineSubscription;
let mut broker_subscription: BrokerSubscription<SkTimelineInfo>;
let mut attempt = 0;
loop {
select! {
@@ -420,9 +423,9 @@ async fn timeline_wal_broker_loop_step(
info!("Broker subscription loop cancelled, shutting down");
return Ok(ControlFlow::Break(()));
},
new_subscription = etcd_broker::subscribe_to_safekeeper_timeline_updates(
new_subscription = etcd_broker::subscribe_for_json_values(
etcd_client,
SkTimelineSubscriptionKind::timeline(broker_prefix.to_owned(), id),
SubscriptionKey::sk_timeline_info(broker_prefix.to_owned(), id),
)
.instrument(info_span!("etcd_subscription")) => match new_subscription {
Ok(new_subscription) => {
@@ -518,7 +521,7 @@ struct WalConnectionData {
safekeeper_id: NodeId,
connection: WalReceiverConnection,
connection_init_time: NaiveDateTime,
last_wal_receiver_data: Option<(ZenithFeedback, NaiveDateTime)>,
last_wal_receiver_data: Option<(ReplicationFeedback, NaiveDateTime)>,
}
#[derive(Debug, PartialEq, Eq)]
@@ -843,7 +846,7 @@ mod tests {
.await;
let now = Utc::now().naive_utc();
dummy_connection_data.last_wal_receiver_data = Some((
ZenithFeedback {
ReplicationFeedback {
current_timeline_size: 1,
ps_writelsn: 1,
ps_applylsn: current_lsn,
@@ -1014,7 +1017,7 @@ mod tests {
let time_over_threshold =
Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
dummy_connection_data.last_wal_receiver_data = Some((
ZenithFeedback {
ReplicationFeedback {
current_timeline_size: 1,
ps_writelsn: current_lsn.0,
ps_applylsn: 1,

View File

@@ -19,7 +19,7 @@ use tokio_stream::StreamExt;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use utils::{
lsn::Lsn,
pq_proto::ZenithFeedback,
pq_proto::ReplicationFeedback,
zid::{NodeId, ZTenantTimelineId},
};
@@ -33,7 +33,7 @@ use crate::{
#[derive(Debug, Clone)]
pub enum WalConnectionEvent {
Started,
NewWal(ZenithFeedback),
NewWal(ReplicationFeedback),
End(Result<(), String>),
}
@@ -328,7 +328,7 @@ async fn handle_walreceiver_connection(
// Send zenith feedback message.
// Regular standby_status_update fields are put into this message.
let zenith_status_update = ZenithFeedback {
let zenith_status_update = ReplicationFeedback {
current_timeline_size: timeline.get_current_logical_size() as u64,
ps_writelsn: write_lsn,
ps_flushlsn: flush_lsn,