Merge remote-tracking branch 'origin/main' into problame/batching-sidecar-task

This commit is contained in:
Christian Schwarz
2024-11-30 00:03:30 +01:00
32 changed files with 444 additions and 192 deletions

1
Cargo.lock generated
View File

@@ -5415,6 +5415,7 @@ dependencies = [
"strum",
"strum_macros",
"thiserror",
"tikv-jemallocator",
"tokio",
"tokio-io-timeout",
"tokio-postgres",

View File

@@ -37,6 +37,7 @@ use std::collections::HashMap;
use std::fs::File;
use std::path::Path;
use std::process::exit;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock};
use std::{thread, time::Duration};
@@ -322,8 +323,15 @@ fn wait_spec(
} else {
spec_set = false;
}
let connstr = Url::parse(connstr).context("cannot parse connstr as a URL")?;
let conn_conf = postgres::config::Config::from_str(connstr.as_str())
.context("cannot build postgres config from connstr")?;
let tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr.as_str())
.context("cannot build tokio postgres config from connstr")?;
let compute_node = ComputeNode {
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
connstr,
conn_conf,
tokio_conn_conf,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
pgversion: get_pg_version_string(pgbin),

View File

@@ -21,7 +21,7 @@
//! - Build the image with the following command:
//!
//! ```bash
//! docker buildx build --build-arg DEBIAN_FLAVOR=bullseye-slim --build-arg GIT_VERSION=local --build-arg PG_VERSION=v14 --build-arg BUILD_TAG="$(date --iso-8601=s -u)" -t localhost:3030/localregistry/compute-node-v14:latest -f compute/Dockerfile.com
//! docker buildx build --platform linux/amd64 --build-arg DEBIAN_VERSION=bullseye --build-arg GIT_VERSION=local --build-arg PG_VERSION=v14 --build-arg BUILD_TAG="$(date --iso-8601=s -u)" -t localhost:3030/localregistry/compute-node-v14:latest -f compute/compute-node.Dockerfile .
//! docker push localhost:3030/localregistry/compute-node-v14:latest
//! ```
@@ -132,7 +132,8 @@ pub(crate) async fn main() -> anyhow::Result<()> {
//
// Initialize pgdata
//
let pg_version = match get_pg_version(pg_bin_dir.as_str()) {
let pgbin = pg_bin_dir.join("postgres");
let pg_version = match get_pg_version(pgbin.as_ref()) {
PostgresMajorVersion::V14 => 14,
PostgresMajorVersion::V15 => 15,
PostgresMajorVersion::V16 => 16,
@@ -155,7 +156,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
//
// Launch postgres process
//
let mut postgres_proc = tokio::process::Command::new(pg_bin_dir.join("postgres"))
let mut postgres_proc = tokio::process::Command::new(pgbin)
.arg("-D")
.arg(&pgdata_dir)
.args(["-c", "wal_level=minimal"])

View File

@@ -6,7 +6,6 @@ use tokio::{
process::Command,
spawn,
};
use tokio_postgres::connect;
use tokio_stream::{self as stream, StreamExt};
use tokio_util::codec::{BytesCodec, FramedRead};
use tracing::warn;
@@ -16,10 +15,8 @@ use crate::pg_helpers::{get_existing_dbs_async, get_existing_roles_async, postgr
use compute_api::responses::CatalogObjects;
pub async fn get_dbs_and_roles(compute: &Arc<ComputeNode>) -> anyhow::Result<CatalogObjects> {
let connstr = compute.connstr.clone();
let (client, connection): (tokio_postgres::Client, _) =
connect(connstr.as_str(), NoTls).await?;
let conf = compute.get_tokio_conn_conf(Some("compute_ctl:get_dbs_and_roles"));
let (client, connection): (tokio_postgres::Client, _) = conf.connect(NoTls).await?;
spawn(async move {
if let Err(e) = connection.await {

View File

@@ -9,7 +9,8 @@ use crate::compute::ComputeNode;
#[instrument(skip_all)]
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
// Connect to the database.
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
let conf = compute.get_tokio_conn_conf(Some("compute_ctl:availability_checker"));
let (client, connection) = conf.connect(NoTls).await?;
if client.is_closed() {
return Err(anyhow!("connection to postgres closed"));
}

View File

@@ -20,8 +20,9 @@ use futures::future::join_all;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use nix::unistd::Pid;
use postgres;
use postgres::error::SqlState;
use postgres::{Client, NoTls};
use postgres::NoTls;
use tracing::{debug, error, info, instrument, warn};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -58,6 +59,10 @@ pub static PG_PID: AtomicU32 = AtomicU32::new(0);
pub struct ComputeNode {
// Url type maintains proper escaping
pub connstr: url::Url,
// We connect to Postgres from many different places, so build configs once
// and reuse them where needed.
pub conn_conf: postgres::config::Config,
pub tokio_conn_conf: tokio_postgres::config::Config,
pub pgdata: String,
pub pgbin: String,
pub pgversion: String,
@@ -800,10 +805,10 @@ impl ComputeNode {
/// version. In the future, it may upgrade all 3rd-party extensions.
#[instrument(skip_all)]
pub fn post_apply_config(&self) -> Result<()> {
let connstr = self.connstr.clone();
let conf = self.get_conn_conf(Some("compute_ctl:post_apply_config"));
thread::spawn(move || {
let func = || {
let mut client = Client::connect(connstr.as_str(), NoTls)?;
let mut client = conf.connect(NoTls)?;
handle_neon_extension_upgrade(&mut client)
.context("handle_neon_extension_upgrade")?;
Ok::<_, anyhow::Error>(())
@@ -815,12 +820,27 @@ impl ComputeNode {
Ok(())
}
pub fn get_conn_conf(&self, application_name: Option<&str>) -> postgres::Config {
let mut conf = self.conn_conf.clone();
if let Some(application_name) = application_name {
conf.application_name(application_name);
}
conf
}
pub fn get_tokio_conn_conf(&self, application_name: Option<&str>) -> tokio_postgres::Config {
let mut conf = self.tokio_conn_conf.clone();
if let Some(application_name) = application_name {
conf.application_name(application_name);
}
conf
}
async fn get_maintenance_client(
conf: &tokio_postgres::Config,
) -> Result<tokio_postgres::Client> {
let mut conf = conf.clone();
conf.application_name("apply_config");
conf.application_name("compute_ctl:apply_config");
let (client, conn) = match conf.connect(NoTls).await {
// If connection fails, it may be the old node with `zenith_admin` superuser.
@@ -837,6 +857,7 @@ impl ComputeNode {
e
);
let mut zenith_admin_conf = postgres::config::Config::from(conf.clone());
zenith_admin_conf.application_name("compute_ctl:apply_config");
zenith_admin_conf.user("zenith_admin");
let mut client =
@@ -1134,8 +1155,7 @@ impl ComputeNode {
/// Do initial configuration of the already started Postgres.
#[instrument(skip_all)]
pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
let mut conf = tokio_postgres::Config::from_str(self.connstr.as_str()).unwrap();
conf.application_name("apply_config");
let conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config"));
let conf = Arc::new(conf);
let spec = Arc::new(
@@ -1161,7 +1181,7 @@ impl ComputeNode {
thread::spawn(move || {
let conf = conf.as_ref().clone();
let mut conf = postgres::config::Config::from(conf);
conf.application_name("migrations");
conf.application_name("compute_ctl:migrations");
let mut client = conf.connect(NoTls)?;
handle_migrations(&mut client).context("apply_config handle_migrations")
@@ -1369,9 +1389,9 @@ impl ComputeNode {
}
self.post_apply_config()?;
let connstr = self.connstr.clone();
let conf = self.get_conn_conf(None);
thread::spawn(move || {
let res = get_installed_extensions(&connstr);
let res = get_installed_extensions(conf);
match res {
Ok(extensions) => {
info!(
@@ -1510,7 +1530,8 @@ impl ComputeNode {
/// Select `pg_stat_statements` data and return it as a stringified JSON
pub async fn collect_insights(&self) -> String {
let mut result_rows: Vec<String> = Vec::new();
let connect_result = tokio_postgres::connect(self.connstr.as_str(), NoTls).await;
let conf = self.get_tokio_conn_conf(Some("compute_ctl:collect_insights"));
let connect_result = conf.connect(NoTls).await;
let (client, connection) = connect_result.unwrap();
tokio::spawn(async move {
if let Err(e) = connection.await {
@@ -1636,10 +1657,9 @@ LIMIT 100",
privileges: &[Privilege],
role_name: &PgIdent,
) -> Result<()> {
use tokio_postgres::config::Config;
use tokio_postgres::NoTls;
let mut conf = Config::from_str(self.connstr.as_str()).unwrap();
let mut conf = self.get_tokio_conn_conf(Some("compute_ctl:set_role_grants"));
conf.dbname(db_name);
let (db_client, conn) = conf
@@ -1676,10 +1696,9 @@ LIMIT 100",
db_name: &PgIdent,
ext_version: ExtVersion,
) -> Result<ExtVersion> {
use tokio_postgres::config::Config;
use tokio_postgres::NoTls;
let mut conf = Config::from_str(self.connstr.as_str()).unwrap();
let mut conf = self.get_tokio_conn_conf(Some("compute_ctl:install_extension"));
conf.dbname(db_name);
let (db_client, conn) = conf

View File

@@ -295,12 +295,11 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
return Response::new(Body::from(msg));
}
let connstr = compute.connstr.clone();
let res = task::spawn_blocking(move || {
installed_extensions::get_installed_extensions(&connstr)
})
.await
.unwrap();
let conf = compute.get_conn_conf(None);
let res =
task::spawn_blocking(move || installed_extensions::get_installed_extensions(conf))
.await
.unwrap();
match res {
Ok(res) => render_json(Body::from(serde_json::to_string(&res).unwrap())),

View File

@@ -10,8 +10,6 @@ use metrics::core::Collector;
use metrics::{register_uint_gauge_vec, UIntGaugeVec};
use once_cell::sync::Lazy;
use crate::pg_helpers::postgres_conf_for_db;
/// We don't reuse get_existing_dbs() just for code clarity
/// and to make database listing query here more explicit.
///
@@ -41,14 +39,16 @@ fn list_dbs(client: &mut Client) -> Result<Vec<String>> {
///
/// Same extension can be installed in multiple databases with different versions,
/// we only keep the highest and lowest version across all databases.
pub fn get_installed_extensions(connstr: &url::Url) -> Result<InstalledExtensions> {
let mut client = Client::connect(connstr.as_str(), NoTls)?;
pub fn get_installed_extensions(mut conf: postgres::config::Config) -> Result<InstalledExtensions> {
conf.application_name("compute_ctl:get_installed_extensions");
let mut client = conf.connect(NoTls)?;
let databases: Vec<String> = list_dbs(&mut client)?;
let mut extensions_map: HashMap<String, InstalledExtension> = HashMap::new();
for db in databases.iter() {
let config = postgres_conf_for_db(connstr, db)?;
let mut db_client = config.connect(NoTls)?;
conf.dbname(db);
let mut db_client = conf.connect(NoTls)?;
let extensions: Vec<(String, String)> = db_client
.query(
"SELECT extname, extversion FROM pg_catalog.pg_extension;",
@@ -82,7 +82,7 @@ pub fn get_installed_extensions(connstr: &url::Url) -> Result<InstalledExtension
}
let res = InstalledExtensions {
extensions: extensions_map.values().cloned().collect(),
extensions: extensions_map.into_values().collect(),
};
Ok(res)

View File

@@ -17,11 +17,8 @@ const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
// should be handled gracefully.
fn watch_compute_activity(compute: &ComputeNode) {
// Suppose that `connstr` doesn't change
let mut connstr = compute.connstr.clone();
connstr
.query_pairs_mut()
.append_pair("application_name", "compute_activity_monitor");
let connstr = connstr.as_str();
let connstr = compute.connstr.clone();
let conf = compute.get_conn_conf(Some("compute_ctl:activity_monitor"));
// During startup and configuration we connect to every Postgres database,
// but we don't want to count this as some user activity. So wait until
@@ -29,7 +26,7 @@ fn watch_compute_activity(compute: &ComputeNode) {
wait_for_postgres_start(compute);
// Define `client` outside of the loop to reuse existing connection if it's active.
let mut client = Client::connect(connstr, NoTls);
let mut client = conf.connect(NoTls);
let mut sleep = false;
let mut prev_active_time: Option<f64> = None;
@@ -57,7 +54,7 @@ fn watch_compute_activity(compute: &ComputeNode) {
info!("connection to Postgres is closed, trying to reconnect");
// Connection is closed, reconnect and try again.
client = Client::connect(connstr, NoTls);
client = conf.connect(NoTls);
continue;
}
@@ -196,7 +193,7 @@ fn watch_compute_activity(compute: &ComputeNode) {
debug!("could not connect to Postgres: {}, retrying", e);
// Establish a new connection and try again.
client = Client::connect(connstr, NoTls);
client = conf.connect(NoTls);
}
}
}

View File

@@ -35,6 +35,7 @@ use utils::backoff;
use utils::backoff::exponential_backoff_duration_seconds;
use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind};
use crate::DownloadKind;
use crate::{
config::AzureConfig, error::Cancelled, ConcurrencyLimiter, Download, DownloadError,
DownloadOpts, Listing, ListingMode, ListingObject, RemotePath, RemoteStorage, StorageMetadata,
@@ -49,10 +50,17 @@ pub struct AzureBlobStorage {
concurrency_limiter: ConcurrencyLimiter,
// Per-request timeout. Accessible for tests.
pub timeout: Duration,
// Alternative timeout used for metadata objects which are expected to be small
pub small_timeout: Duration,
}
impl AzureBlobStorage {
pub fn new(azure_config: &AzureConfig, timeout: Duration) -> Result<Self> {
pub fn new(
azure_config: &AzureConfig,
timeout: Duration,
small_timeout: Duration,
) -> Result<Self> {
debug!(
"Creating azure remote storage for azure container {}",
azure_config.container_name
@@ -94,6 +102,7 @@ impl AzureBlobStorage {
max_keys_per_list_response,
concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()),
timeout,
small_timeout,
})
}
@@ -133,6 +142,7 @@ impl AzureBlobStorage {
async fn download_for_builder(
&self,
builder: GetBlobBuilder,
timeout: Duration,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
let kind = RequestKind::Get;
@@ -156,7 +166,7 @@ impl AzureBlobStorage {
.map_err(to_download_error);
// apply per request timeout
let response = tokio_stream::StreamExt::timeout(response, self.timeout);
let response = tokio_stream::StreamExt::timeout(response, timeout);
// flatten
let response = response.map(|res| match res {
@@ -415,7 +425,7 @@ impl RemoteStorage for AzureBlobStorage {
let blob_client = self.client.blob_client(self.relative_path_to_name(key));
let properties_future = blob_client.get_properties().into_future();
let properties_future = tokio::time::timeout(self.timeout, properties_future);
let properties_future = tokio::time::timeout(self.small_timeout, properties_future);
let res = tokio::select! {
res = properties_future => res,
@@ -521,7 +531,12 @@ impl RemoteStorage for AzureBlobStorage {
});
}
self.download_for_builder(builder, cancel).await
let timeout = match opts.kind {
DownloadKind::Small => self.small_timeout,
DownloadKind::Large => self.timeout,
};
self.download_for_builder(builder, timeout, cancel).await
}
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {

View File

@@ -24,6 +24,13 @@ pub struct RemoteStorageConfig {
skip_serializing_if = "is_default_timeout"
)]
pub timeout: Duration,
/// Alternative timeout used for metadata objects which are expected to be small
#[serde(
with = "humantime_serde",
default = "default_small_timeout",
skip_serializing_if = "is_default_small_timeout"
)]
pub small_timeout: Duration,
}
impl RemoteStorageKind {
@@ -40,10 +47,18 @@ fn default_timeout() -> Duration {
RemoteStorageConfig::DEFAULT_TIMEOUT
}
fn default_small_timeout() -> Duration {
RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
}
fn is_default_timeout(d: &Duration) -> bool {
*d == RemoteStorageConfig::DEFAULT_TIMEOUT
}
fn is_default_small_timeout(d: &Duration) -> bool {
*d == RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
}
/// A kind of a remote storage to connect to, with its connection configuration.
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
#[serde(untagged)]
@@ -184,6 +199,7 @@ fn serialize_storage_class<S: serde::Serializer>(
impl RemoteStorageConfig {
pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120);
pub const DEFAULT_SMALL_TIMEOUT: Duration = std::time::Duration::from_secs(30);
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<RemoteStorageConfig> {
Ok(utils::toml_edit_ext::deserialize_item(toml)?)
@@ -219,7 +235,8 @@ timeout = '5s'";
storage: RemoteStorageKind::LocalFs {
local_path: Utf8PathBuf::from(".")
},
timeout: Duration::from_secs(5)
timeout: Duration::from_secs(5),
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
}
);
}
@@ -247,7 +264,8 @@ timeout = '5s'";
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
upload_storage_class: Some(StorageClass::IntelligentTiering),
}),
timeout: Duration::from_secs(7)
timeout: Duration::from_secs(7),
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
}
);
}
@@ -299,7 +317,8 @@ timeout = '5s'";
concurrency_limit: default_remote_storage_azure_concurrency_limit(),
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
}),
timeout: Duration::from_secs(7)
timeout: Duration::from_secs(7),
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
}
);
}

View File

@@ -178,6 +178,15 @@ pub struct DownloadOpts {
/// The end of the byte range to download, or unbounded. Must be after the
/// start bound.
pub byte_end: Bound<u64>,
/// Indicate whether we're downloading something small or large: this indirectly controls
/// timeouts: for something like an index/manifest/heatmap, we should time out faster than
/// for layer files
pub kind: DownloadKind,
}
pub enum DownloadKind {
Large,
Small,
}
impl Default for DownloadOpts {
@@ -186,6 +195,7 @@ impl Default for DownloadOpts {
etag: Default::default(),
byte_start: Bound::Unbounded,
byte_end: Bound::Unbounded,
kind: DownloadKind::Large,
}
}
}
@@ -584,6 +594,10 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
impl GenericRemoteStorage {
pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
let timeout = storage_config.timeout;
// If somkeone overrides timeout to be small without adjusting small_timeout, then adjust it automatically
let small_timeout = std::cmp::min(storage_config.small_timeout, timeout);
Ok(match &storage_config.storage {
RemoteStorageKind::LocalFs { local_path: path } => {
info!("Using fs root '{path}' as a remote storage");
@@ -606,7 +620,11 @@ impl GenericRemoteStorage {
.unwrap_or("<AZURE_STORAGE_ACCOUNT>");
info!("Using azure container '{}' in account '{storage_account}' in region '{}' as a remote storage, prefix in container: '{:?}'",
azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container);
Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config, timeout)?))
Self::AzureBlob(Arc::new(AzureBlobStorage::new(
azure_config,
timeout,
small_timeout,
)?))
}
})
}

View File

@@ -219,7 +219,8 @@ async fn create_azure_client(
concurrency_limit: NonZeroUsize::new(100).unwrap(),
max_keys_per_list_response,
}),
timeout: Duration::from_secs(120),
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
};
Ok(Arc::new(
GenericRemoteStorage::from_config(&remote_storage_config)

View File

@@ -396,6 +396,7 @@ async fn create_s3_client(
upload_storage_class: None,
}),
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
};
Ok(Arc::new(
GenericRemoteStorage::from_config(&remote_storage_config)

View File

@@ -838,6 +838,7 @@ mod test {
local_path: remote_fs_dir.clone(),
},
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
};
let storage = GenericRemoteStorage::from_config(&storage_config)
.await

View File

@@ -5423,6 +5423,7 @@ pub(crate) mod harness {
local_path: remote_fs_dir.clone(),
},
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
};
let remote_storage = GenericRemoteStorage::from_config(&config).await.unwrap();
let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()));

View File

@@ -30,7 +30,9 @@ use crate::tenant::Generation;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
use crate::TEMP_FILE_SUFFIX;
use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath};
use remote_storage::{
DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
};
use utils::crashsafe::path_with_suffix_extension;
use utils::id::{TenantId, TimelineId};
use utils::pausable_failpoint;
@@ -345,12 +347,13 @@ pub async fn list_remote_timelines(
async fn do_download_remote_path_retry_forever(
storage: &GenericRemoteStorage,
remote_path: &RemotePath,
download_opts: DownloadOpts,
cancel: &CancellationToken,
) -> Result<(Vec<u8>, SystemTime), DownloadError> {
download_retry_forever(
|| async {
let download = storage
.download(remote_path, &DownloadOpts::default(), cancel)
.download(remote_path, &download_opts, cancel)
.await?;
let mut bytes = Vec::new();
@@ -377,8 +380,13 @@ async fn do_download_tenant_manifest(
) -> Result<(TenantManifest, Generation, SystemTime), DownloadError> {
let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
let download_opts = DownloadOpts {
kind: DownloadKind::Small,
..Default::default()
};
let (manifest_bytes, manifest_bytes_mtime) =
do_download_remote_path_retry_forever(storage, &remote_path, cancel).await?;
do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
let tenant_manifest = TenantManifest::from_json_bytes(&manifest_bytes)
.with_context(|| format!("deserialize tenant manifest file at {remote_path:?}"))
@@ -398,8 +406,13 @@ async fn do_download_index_part(
timeline_id.expect("A timeline ID is always provided when downloading an index");
let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
let download_opts = DownloadOpts {
kind: DownloadKind::Small,
..Default::default()
};
let (index_part_bytes, index_part_mtime) =
do_download_remote_path_retry_forever(storage, &remote_path, cancel).await?;
do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
.with_context(|| format!("deserialize index part file at {remote_path:?}"))

View File

@@ -49,7 +49,7 @@ use futures::Future;
use metrics::UIntGauge;
use pageserver_api::models::SecondaryProgress;
use pageserver_api::shard::TenantShardId;
use remote_storage::{DownloadError, DownloadOpts, Etag, GenericRemoteStorage};
use remote_storage::{DownloadError, DownloadKind, DownloadOpts, Etag, GenericRemoteStorage};
use tokio_util::sync::CancellationToken;
use tracing::{info_span, instrument, warn, Instrument};
@@ -946,6 +946,7 @@ impl<'a> TenantDownloader<'a> {
let cancel = &self.secondary_state.cancel;
let opts = DownloadOpts {
etag: prev_etag.cloned(),
kind: DownloadKind::Small,
..Default::default()
};

View File

@@ -4,7 +4,8 @@ use anyhow::Context;
use bytes::Bytes;
use postgres_ffi::ControlFileData;
use remote_storage::{
Download, DownloadError, DownloadOpts, GenericRemoteStorage, Listing, ListingObject, RemotePath,
Download, DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, Listing,
ListingObject, RemotePath,
};
use serde::de::DeserializeOwned;
use tokio_util::sync::CancellationToken;
@@ -239,6 +240,7 @@ impl RemoteStorageWrapper {
.download(
path,
&DownloadOpts {
kind: DownloadKind::Large,
etag: None,
byte_start: Bound::Included(start_inclusive),
byte_end: Bound::Excluded(end_exclusive)

View File

@@ -486,6 +486,7 @@ mod tests {
upload_storage_class: None,
}),
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
})
);
assert_eq!(parquet_upload.parquet_upload_row_group_size, 100);
@@ -545,6 +546,7 @@ mod tests {
local_path: tmpdir.to_path_buf(),
},
timeout: std::time::Duration::from_secs(120),
small_timeout: std::time::Duration::from_secs(30),
};
let storage = GenericRemoteStorage::from_config(&remote_storage_config)
.await

View File

@@ -41,6 +41,7 @@ serde_json.workspace = true
strum.workspace = true
strum_macros.workspace = true
thiserror.workspace = true
tikv-jemallocator.workspace = true
tokio = { workspace = true, features = ["fs"] }
tokio-util = { workspace = true }
tokio-io-timeout.workspace = true

View File

@@ -6,6 +6,7 @@ mod benchutils;
use std::io::Write as _;
use benchutils::Env;
use bytes::BytesMut;
use camino_tempfile::tempfile;
use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion};
use itertools::Itertools as _;
@@ -23,6 +24,9 @@ const KB: usize = 1024;
const MB: usize = 1024 * KB;
const GB: usize = 1024 * MB;
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
// Register benchmarks with Criterion.
criterion_group!(
name = benches;
@@ -30,7 +34,8 @@ criterion_group!(
targets = bench_process_msg,
bench_wal_acceptor,
bench_wal_acceptor_throughput,
bench_file_write
bench_file_write,
bench_bytes_reserve,
);
criterion_main!(benches);
@@ -341,3 +346,26 @@ fn bench_file_write(c: &mut Criterion) {
Ok(())
}
}
/// Benchmarks the cost of memory allocations when receiving WAL messages. This emulates the logic
/// in FeMessage::parse, which extends the read buffer. It is primarily intended to test jemalloc.
fn bench_bytes_reserve(c: &mut Criterion) {
let mut g = c.benchmark_group("bytes_reserve");
for size in [1, 64, KB, 8 * KB, 128 * KB] {
g.throughput(criterion::Throughput::Bytes(size as u64));
g.bench_function(format!("size={size}"), |b| run_bench(b, size).unwrap());
}
fn run_bench(b: &mut Bencher, size: usize) -> anyhow::Result<()> {
let mut bytes = BytesMut::new();
let data = vec![0; size];
b.iter(|| {
bytes.reserve(size);
bytes.extend_from_slice(&data);
bytes.split_to(size).freeze();
});
Ok(())
}
}

View File

@@ -48,6 +48,9 @@ use utils::{
tcp_listener,
};
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
const PID_FILE_NAME: &str = "safekeeper.pid";
const ID_FILE_NAME: &str = "safekeeper.id";

View File

@@ -305,7 +305,7 @@ impl std::ops::Add for AffinityScore {
/// Hint for whether this is a sincere attempt to schedule, or a speculative
/// check for where we _would_ schedule (done during optimization)
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) enum ScheduleMode {
Normal,
Speculative,
@@ -319,7 +319,7 @@ impl Default for ScheduleMode {
// For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
// it for many shards in the same tenant.
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub(crate) struct ScheduleContext {
/// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
pub(crate) nodes: HashMap<NodeId, AffinityScore>,
@@ -331,6 +331,14 @@ pub(crate) struct ScheduleContext {
}
impl ScheduleContext {
pub(crate) fn new(mode: ScheduleMode) -> Self {
Self {
nodes: HashMap::new(),
attached_nodes: HashMap::new(),
mode,
}
}
/// Input is a list of nodes we would like to avoid using again within this context. The more
/// times a node is passed into this call, the less inclined we are to use it.
pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
@@ -355,6 +363,11 @@ impl ScheduleContext {
pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
self.attached_nodes.get(&node_id).copied().unwrap_or(0)
}
#[cfg(test)]
pub(crate) fn attach_count(&self) -> usize {
self.attached_nodes.values().sum()
}
}
pub(crate) enum RefCountUpdate {

View File

@@ -1,3 +1,6 @@
pub mod chaos_injector;
mod context_iterator;
use hyper::Uri;
use std::{
borrow::Cow,
@@ -95,7 +98,7 @@ use crate::{
},
};
pub mod chaos_injector;
use context_iterator::TenantShardContextIterator;
// For operations that should be quick, like attaching a new tenant
const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
@@ -5498,49 +5501,51 @@ impl Service {
let mut tenants_affected: usize = 0;
for (tenant_shard_id, tenant_shard) in tenants {
if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) {
// When a node goes offline, we set its observed configuration to None, indicating unknown: we will
// not assume our knowledge of the node's configuration is accurate until it comes back online
observed_loc.conf = None;
}
for (_tenant_id, mut schedule_context, shards) in
TenantShardContextIterator::new(tenants, ScheduleMode::Normal)
{
for tenant_shard in shards {
let tenant_shard_id = tenant_shard.tenant_shard_id;
if let Some(observed_loc) =
tenant_shard.observed.locations.get_mut(&node_id)
{
// When a node goes offline, we set its observed configuration to None, indicating unknown: we will
// not assume our knowledge of the node's configuration is accurate until it comes back online
observed_loc.conf = None;
}
if nodes.len() == 1 {
// Special case for single-node cluster: there is no point trying to reschedule
// any tenant shards: avoid doing so, in order to avoid spewing warnings about
// failures to schedule them.
continue;
}
if nodes.len() == 1 {
// Special case for single-node cluster: there is no point trying to reschedule
// any tenant shards: avoid doing so, in order to avoid spewing warnings about
// failures to schedule them.
continue;
}
if !nodes
.values()
.any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_)))
{
// Special case for when all nodes are unavailable and/or unschedulable: there is no point
// trying to reschedule since there's nowhere else to go. Without this
// branch we incorrectly detach tenants in response to node unavailability.
continue;
}
if !nodes
.values()
.any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_)))
{
// Special case for when all nodes are unavailable and/or unschedulable: there is no point
// trying to reschedule since there's nowhere else to go. Without this
// branch we incorrectly detach tenants in response to node unavailability.
continue;
}
if tenant_shard.intent.demote_attached(scheduler, node_id) {
tenant_shard.sequence = tenant_shard.sequence.next();
if tenant_shard.intent.demote_attached(scheduler, node_id) {
tenant_shard.sequence = tenant_shard.sequence.next();
// TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters
// for tenants without secondary locations: if they have a secondary location, then this
// schedule() call is just promoting an existing secondary)
let mut schedule_context = ScheduleContext::default();
match tenant_shard.schedule(scheduler, &mut schedule_context) {
Err(e) => {
// It is possible that some tenants will become unschedulable when too many pageservers
// go offline: in this case there isn't much we can do other than make the issue observable.
// TODO: give TenantShard a scheduling error attribute to be queried later.
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id);
}
Ok(()) => {
if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() {
tenants_affected += 1;
};
match tenant_shard.schedule(scheduler, &mut schedule_context) {
Err(e) => {
// It is possible that some tenants will become unschedulable when too many pageservers
// go offline: in this case there isn't much we can do other than make the issue observable.
// TODO: give TenantShard a scheduling error attribute to be queried later.
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id);
}
Ok(()) => {
if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() {
tenants_affected += 1;
};
}
}
}
}
@@ -6011,14 +6016,8 @@ impl Service {
let (nodes, tenants, _scheduler) = locked.parts_mut();
let pageservers = nodes.clone();
let mut schedule_context = ScheduleContext::default();
let mut reconciles_spawned = 0;
for (tenant_shard_id, shard) in tenants.iter_mut() {
if tenant_shard_id.is_shard_zero() {
schedule_context = ScheduleContext::default();
}
for shard in tenants.values_mut() {
// Skip checking if this shard is already enqueued for reconciliation
if shard.delayed_reconcile && self.reconciler_concurrency.available_permits() == 0 {
// If there is something delayed, then return a nonzero count so that
@@ -6033,8 +6032,6 @@ impl Service {
if self.maybe_reconcile_shard(shard, &pageservers).is_some() {
reconciles_spawned += 1;
}
schedule_context.avoid(&shard.intent.all_pageservers());
}
reconciles_spawned
@@ -6103,95 +6100,62 @@ impl Service {
}
fn optimize_all_plan(&self) -> Vec<(TenantShardId, ScheduleOptimization)> {
let mut schedule_context = ScheduleContext::default();
let mut tenant_shards: Vec<&TenantShard> = Vec::new();
// How many candidate optimizations we will generate, before evaluating them for readniess: setting
// this higher than the execution limit gives us a chance to execute some work even if the first
// few optimizations we find are not ready.
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 8;
let mut work = Vec::new();
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
for (tenant_shard_id, shard) in tenants.iter() {
if tenant_shard_id.is_shard_zero() {
// Reset accumulators on the first shard in a tenant
schedule_context = ScheduleContext::default();
schedule_context.mode = ScheduleMode::Speculative;
tenant_shards.clear();
}
if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS {
break;
}
match shard.get_scheduling_policy() {
ShardSchedulingPolicy::Active => {
// Ok to do optimization
for (_tenant_id, schedule_context, shards) in
TenantShardContextIterator::new(tenants, ScheduleMode::Speculative)
{
for shard in shards {
if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS {
break;
}
ShardSchedulingPolicy::Essential
| ShardSchedulingPolicy::Pause
| ShardSchedulingPolicy::Stop => {
// Policy prevents optimizing this shard.
continue;
match shard.get_scheduling_policy() {
ShardSchedulingPolicy::Active => {
// Ok to do optimization
}
ShardSchedulingPolicy::Essential
| ShardSchedulingPolicy::Pause
| ShardSchedulingPolicy::Stop => {
// Policy prevents optimizing this shard.
continue;
}
}
}
// Accumulate the schedule context for all the shards in a tenant: we must have
// the total view of all shards before we can try to optimize any of them.
schedule_context.avoid(&shard.intent.all_pageservers());
if let Some(attached) = shard.intent.get_attached() {
schedule_context.push_attached(*attached);
}
tenant_shards.push(shard);
// Once we have seen the last shard in the tenant, proceed to search across all shards
// in the tenant for optimizations
if shard.shard.number.0 == shard.shard.count.count() - 1 {
if tenant_shards.iter().any(|s| s.reconciler.is_some()) {
if !matches!(shard.splitting, SplitState::Idle)
|| matches!(shard.policy, PlacementPolicy::Detached)
|| shard.reconciler.is_some()
{
// Do not start any optimizations while another change to the tenant is ongoing: this
// is not necessary for correctness, but simplifies operations and implicitly throttles
// optimization changes to happen in a "trickle" over time.
continue;
}
if tenant_shards.iter().any(|s| {
!matches!(s.splitting, SplitState::Idle)
|| matches!(s.policy, PlacementPolicy::Detached)
}) {
// Never attempt to optimize a tenant that is currently being split, or
// a tenant that is meant to be detached
continue;
}
// TODO: optimization calculations are relatively expensive: create some fast-path for
// the common idle case (avoiding the search on tenants that we have recently checked)
for shard in &tenant_shards {
if let Some(optimization) =
// If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to
// its primary location based on soft constraints, cut it over.
shard.optimize_attachment(nodes, &schedule_context)
{
work.push((shard.tenant_shard_id, optimization));
break;
} else if let Some(optimization) =
// If idle, maybe optimize secondary locations: if a shard has a secondary location that would be
// better placed on another node, based on ScheduleContext, then adjust it. This
// covers cases like after a shard split, where we might have too many shards
// in the same tenant with secondary locations on the node where they originally split.
shard.optimize_secondary(scheduler, &schedule_context)
{
work.push((shard.tenant_shard_id, optimization));
break;
}
// TODO: extend this mechanism to prefer attaching on nodes with fewer attached
// tenants (i.e. extend schedule state to distinguish attached from secondary counts),
// for the total number of attachments on a node (not just within a tenant.)
if let Some(optimization) =
// If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to
// its primary location based on soft constraints, cut it over.
shard.optimize_attachment(nodes, &schedule_context)
{
work.push((shard.tenant_shard_id, optimization));
break;
} else if let Some(optimization) =
// If idle, maybe optimize secondary locations: if a shard has a secondary location that would be
// better placed on another node, based on ScheduleContext, then adjust it. This
// covers cases like after a shard split, where we might have too many shards
// in the same tenant with secondary locations on the node where they originally split.
shard.optimize_secondary(scheduler, &schedule_context)
{
work.push((shard.tenant_shard_id, optimization));
break;
}
}
}

View File

@@ -0,0 +1,139 @@
use std::collections::BTreeMap;
use utils::id::TenantId;
use utils::shard::TenantShardId;
use crate::scheduler::{ScheduleContext, ScheduleMode};
use crate::tenant_shard::TenantShard;
/// When making scheduling decisions, it is useful to have the ScheduleContext for a whole
/// tenant while considering the individual shards within it. This iterator is a helper
/// that gathers all the shards in a tenant and then yields them together with a ScheduleContext
/// for the tenant.
pub(super) struct TenantShardContextIterator<'a> {
schedule_mode: ScheduleMode,
inner: std::collections::btree_map::IterMut<'a, TenantShardId, TenantShard>,
}
impl<'a> TenantShardContextIterator<'a> {
pub(super) fn new(
tenants: &'a mut BTreeMap<TenantShardId, TenantShard>,
schedule_mode: ScheduleMode,
) -> Self {
Self {
schedule_mode,
inner: tenants.iter_mut(),
}
}
}
impl<'a> Iterator for TenantShardContextIterator<'a> {
type Item = (TenantId, ScheduleContext, Vec<&'a mut TenantShard>);
fn next(&mut self) -> Option<Self::Item> {
let mut tenant_shards = Vec::new();
let mut schedule_context = ScheduleContext::new(self.schedule_mode.clone());
loop {
let (tenant_shard_id, shard) = self.inner.next()?;
if tenant_shard_id.is_shard_zero() {
// Cleared on last shard of previous tenant
assert!(tenant_shards.is_empty());
}
// Accumulate the schedule context for all the shards in a tenant
schedule_context.avoid(&shard.intent.all_pageservers());
if let Some(attached) = shard.intent.get_attached() {
schedule_context.push_attached(*attached);
}
tenant_shards.push(shard);
if tenant_shard_id.shard_number.0 == tenant_shard_id.shard_count.count() - 1 {
return Some((tenant_shard_id.tenant_id, schedule_context, tenant_shards));
}
}
}
}
#[cfg(test)]
mod tests {
use std::{collections::BTreeMap, str::FromStr};
use pageserver_api::controller_api::PlacementPolicy;
use utils::shard::{ShardCount, ShardNumber};
use crate::{
scheduler::test_utils::make_test_nodes, service::Scheduler,
tenant_shard::tests::make_test_tenant_with_id,
};
use super::*;
#[test]
fn test_context_iterator() {
// Hand-crafted tenant IDs to ensure they appear in the expected order when put into
// a btreemap & iterated
let mut t_1_shards = make_test_tenant_with_id(
TenantId::from_str("af0480929707ee75372337efaa5ecf96").unwrap(),
PlacementPolicy::Attached(1),
ShardCount(1),
None,
);
let t_2_shards = make_test_tenant_with_id(
TenantId::from_str("bf0480929707ee75372337efaa5ecf96").unwrap(),
PlacementPolicy::Attached(1),
ShardCount(4),
None,
);
let mut t_3_shards = make_test_tenant_with_id(
TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap(),
PlacementPolicy::Attached(1),
ShardCount(1),
None,
);
let t1_id = t_1_shards[0].tenant_shard_id.tenant_id;
let t2_id = t_2_shards[0].tenant_shard_id.tenant_id;
let t3_id = t_3_shards[0].tenant_shard_id.tenant_id;
let mut tenants = BTreeMap::new();
tenants.insert(t_1_shards[0].tenant_shard_id, t_1_shards.pop().unwrap());
for shard in t_2_shards {
tenants.insert(shard.tenant_shard_id, shard);
}
tenants.insert(t_3_shards[0].tenant_shard_id, t_3_shards.pop().unwrap());
let nodes = make_test_nodes(3, &[]);
let mut scheduler = Scheduler::new(nodes.values());
let mut context = ScheduleContext::default();
for shard in tenants.values_mut() {
shard.schedule(&mut scheduler, &mut context).unwrap();
}
let mut iter = TenantShardContextIterator::new(&mut tenants, ScheduleMode::Speculative);
let (tenant_id, context, shards) = iter.next().unwrap();
assert_eq!(tenant_id, t1_id);
assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0));
assert_eq!(shards.len(), 1);
assert_eq!(context.attach_count(), 1);
let (tenant_id, context, shards) = iter.next().unwrap();
assert_eq!(tenant_id, t2_id);
assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0));
assert_eq!(shards[1].tenant_shard_id.shard_number, ShardNumber(1));
assert_eq!(shards[2].tenant_shard_id.shard_number, ShardNumber(2));
assert_eq!(shards[3].tenant_shard_id.shard_number, ShardNumber(3));
assert_eq!(shards.len(), 4);
assert_eq!(context.attach_count(), 4);
let (tenant_id, context, shards) = iter.next().unwrap();
assert_eq!(tenant_id, t3_id);
assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0));
assert_eq!(shards.len(), 1);
assert_eq!(context.attach_count(), 1);
for shard in tenants.values_mut() {
shard.intent.clear(&mut scheduler);
}
}
}

View File

@@ -1574,13 +1574,20 @@ pub(crate) mod tests {
)
}
fn make_test_tenant(
pub(crate) fn make_test_tenant(
policy: PlacementPolicy,
shard_count: ShardCount,
preferred_az: Option<AvailabilityZone>,
) -> Vec<TenantShard> {
let tenant_id = TenantId::generate();
make_test_tenant_with_id(TenantId::generate(), policy, shard_count, preferred_az)
}
pub(crate) fn make_test_tenant_with_id(
tenant_id: TenantId,
policy: PlacementPolicy,
shard_count: ShardCount,
preferred_az: Option<AvailabilityZone>,
) -> Vec<TenantShard> {
(0..shard_count.count())
.map(|i| {
let shard_number = ShardNumber(i);

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.2",
"3c15b6565f6c8d36d169ed9ea7412cf90cfb2a8f"
"faebe5e5aff5687908504453623778f8515529db"
],
"v16": [
"16.6",
"f5cfc6fa898544050e821ac688adafece1ac3cff"
"13e9e3539419003e79bd9aa29e1bc44f3fd555dd"
],
"v15": [
"15.10",
"aed79ee87b94779cc52ec13e3b74eba6ada93f05"
"d929b9a8b9f32f6fe5a0eac3e6e963f0e44e27e6"
],
"v14": [
"14.15",
"284ae56be2397fd3eaf20777fa220b2d0ad968f5"
"c1989c934d46e04e78b3c496c8a34bcd40ddceeb"
]
}