Compare commits

..

33 Commits

Author SHA1 Message Date
Konstantin Knizhnik
3d65cb9580 Correctly handle cancel of prewarm query 2025-04-26 07:49:32 +03:00
Konstantin Knizhnik
13be92fee3 Address review comments 2025-04-26 07:49:32 +03:00
Konstantin Knizhnik
6d533122e4 Address review comments 2025-04-26 07:49:32 +03:00
Konstantin Knizhnik
90263c3e19 Address review comments 2025-04-26 07:49:31 +03:00
Konstantin Knizhnik
879984ceaa Update pgxn/neon/file_cache.c
Co-authored-by: Matthias van de Meent <matthias@neon.tech>
2025-04-26 07:49:31 +03:00
Konstantin Knizhnik
401a011006 Make ruff happy 2025-04-26 07:49:31 +03:00
Konstantin Knizhnik
05381d1dda Prefetch using background workers 2025-04-26 07:49:31 +03:00
Konstantin Knizhnik
e670a214c5 Add check for number of pinned pages 2025-04-26 07:49:30 +03:00
Konstantin Knizhnik
c293b96e0a Cancel prewarm is LFC limit is reached 2025-04-26 07:49:30 +03:00
Konstantin Knizhnik
c3466e204f Redue test_lfc_prewarm test duration 2025-04-26 07:49:30 +03:00
Konstantin Knizhnik
f78ad4904a Fix lfc_prewarm 2025-04-26 07:49:30 +03:00
Konstantin Knizhnik
1bf91b4a01 Fix bug in calculating LFC cache entry size 2025-04-26 07:49:30 +03:00
Konstantin Knizhnik
8132f51712 Fix check for file cache size chunk size 2025-04-26 07:49:29 +03:00
Konstantin Knizhnik
13ebbbfb65 Update description of neon.file_cache_prewarm_limit 2025-04-26 07:49:29 +03:00
Konstantin Knizhnik
ff264158c5 Add n_pages top FileCacheState 2025-04-26 07:49:29 +03:00
Konstantin Knizhnik
30080e4422 Address review comments 2025-04-26 07:49:29 +03:00
Konstantin Knizhnik
fa4bd2b901 Rebase with main 2025-04-26 07:49:28 +03:00
Konstantin Knizhnik
fcfaf0a3d0 Fix bug in LFC state size calculation 2025-04-26 07:49:28 +03:00
Konstantin Knizhnik
00a724a533 Fix compiler warnings 2025-04-26 07:49:28 +03:00
Konstantin Knizhnik
93f7e89785 Update pgxn/neon/file_cache.c
Co-authored-by: Matthias van de Meent <matthias@neon.tech>
2025-04-26 07:49:28 +03:00
Konstantin Knizhnik
b382304b02 Make ruff happy 2025-04-26 07:49:27 +03:00
Konstantin Knizhnik
ac824227a6 Make it possible to specify LFC chunk size in postgresql.conf 2025-04-26 07:49:27 +03:00
Konstantin Knizhnik
680ef72954 Use Min(lfc_prewarm_batnch, readahead-buffer_size) 2025-04-26 07:49:27 +03:00
Konstantin Knizhnik
a47089bb30 Use standard prefetch mechanism for geting prewarm results fropm page server 2025-04-26 07:49:27 +03:00
Konstantin Knizhnik
65c263c2ea Fix merge conflict 2025-04-26 07:49:26 +03:00
Konstantin Knizhnik
971d20169b Check for concurrent prewarm 2025-04-26 07:49:26 +03:00
Konstantin Knizhnik
f98e54186f Implement prewarm using lfc_prefetch 2025-04-26 07:49:26 +03:00
Konstantin Knizhnik
f370046e46 Add more comments explaining correctness of lfc_prefetch 2025-04-26 07:49:26 +03:00
Konstantin Knizhnik
ccab7d0234 Store prefetch results in LFC cache once as soon as they are received 2025-04-26 07:49:25 +03:00
Lokesh
459d51974c doc: minor updates to consumption-metrics document (#7153)
## Problem
Proposed minor changes to the `consumption_metrics` document.

## Summary of changes
- Fixed minor typos in the document.
- Minor formatting in the description of metrics `timeline_logical_size`
and `synthetic_storage_size`. Makes this consistent as with description
  of other metrics in the document.

## Checklist before requesting a review

- [x] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

Co-authored-by: Mikhail Kot <mikhail@neon.tech>
2025-04-25 19:46:40 +00:00
StepSecurity Bot
902d361107 CI/CD Hardening: Fixing StepSecurity Flagged Issues (#11724)
### Summary
I'm fixing one or more of the following CI/CD misconfigurations to
improve security. Please feel free to leave a comment if you think the
current permissions for the GITHUB_TOKEN should not be restricted so I
can take a note of it as accepted behaviour.

- Restrict permissions for GITHUB_TOKEN
- Add step-security/harden-runner
- Pin Actions to a full length commit SHA

### Security Fixes
will fix https://github.com/neondatabase/cloud/issues/26141
2025-04-25 14:36:45 +00:00
Dmitrii Kovalkov
ef53a76434 storage_broker: https handler (#11603)
## Problem
Broker supports only HTTP, no HTTPS
- Closes: https://github.com/neondatabase/cloud/issues/27492

## Summary of changes
- Add `listen_https_addr`, `ssl_key_file`, `ssl_cert_file`,
`ssl_cert_reload_period` arguments to storage broker
- Make `listen_addr` argument optional
- Listen https in storage broker
- Support https for storage broker request in neon_local
- Add `use_https_storage_broker_api` option to NeonEnvBuilder
2025-04-25 14:28:56 +00:00
Vlad Lazar
6f0046b688 storage_controller: ensure mutual exclusion for imports and shard splits (#11632)
## Problem

Shard splits break timeline imports.

## Summary of Changes

Ensure mutual exclusion for imports and shard splits.

On the shard split code path:
1. Right before shard splitting, check the database to ensure that
no-import is on-going for the tenant. Exclusion is guaranteed because
this validation is done while holding the exclusive tenant lock.
Timeline creation (and import creation implicitly) requires a shared
tenant lock.
2. When selecting a shard to split, use the in-mem state to exclude
shards with an on-going import. This is opportunistic since an import
might start after the check, but allows shard splits to make progres
instead of continously retrying to split the same shard.

On the timeline creation code path:
1. Check the in-memory splitting flag on all shards of the tenant. If
any of them are splitting, error out asking the client to retry. On the
happy path this is not required, due to the tenant lock set-up described
above, but it covers the case where we restart with a pending
shard-split.

Closes https://github.com/neondatabase/neon/issues/11567
2025-04-25 11:46:15 +00:00
38 changed files with 1240 additions and 349 deletions

View File

@@ -19,7 +19,7 @@ jobs:
runs-on: ubuntu-22.04
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@v2
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
with:
egress-policy: audit

View File

@@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-22.04
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@v2
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
with:
egress-policy: audit

View File

@@ -14,7 +14,7 @@ jobs:
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@v2
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
with:
egress-policy: audit

View File

@@ -28,7 +28,7 @@ jobs:
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@v2
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
with:
egress-policy: audit
@@ -75,7 +75,7 @@ jobs:
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@v2
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
with:
egress-policy: audit

View File

@@ -41,7 +41,7 @@ jobs:
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@v2
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
with:
egress-policy: audit

View File

@@ -35,7 +35,7 @@ jobs:
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@v2
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
with:
egress-policy: audit
@@ -73,7 +73,7 @@ jobs:
}}
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@v2
uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0
with:
egress-policy: audit

3
Cargo.lock generated
View File

@@ -6616,12 +6616,14 @@ dependencies = [
"anyhow",
"async-stream",
"bytes",
"camino",
"clap",
"const_format",
"futures",
"futures-core",
"futures-util",
"http-body-util",
"http-utils",
"humantime",
"hyper 1.4.1",
"hyper-util",
@@ -6631,6 +6633,7 @@ dependencies = [
"prost 0.13.3",
"rustls 0.23.18",
"tokio",
"tokio-rustls 0.26.0",
"tonic",
"tonic-build",
"tracing",

View File

@@ -17,8 +17,10 @@ use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail};
use clap::Parser;
use compute_api::spec::ComputeMode;
use control_plane::broker::StorageBroker;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage};
use control_plane::local_env;
use control_plane::local_env::{
EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf,
NeonLocalInitPageserverConf, SafekeeperConf,
@@ -28,7 +30,6 @@ use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage_controller::{
NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
};
use control_plane::{broker, local_env};
use nix::fcntl::{FlockArg, flock};
use pageserver_api::config::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
@@ -988,7 +989,8 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
NeonLocalInitConf {
control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()),
broker: NeonBroker {
listen_addr: DEFAULT_BROKER_ADDR.parse().unwrap(),
listen_addr: Some(DEFAULT_BROKER_ADDR.parse().unwrap()),
listen_https_addr: None,
},
safekeepers: vec![SafekeeperConf {
id: DEFAULT_SAFEKEEPER_ID,
@@ -1777,7 +1779,8 @@ async fn handle_endpoint_storage(
async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> {
match subcmd {
StorageBrokerCmd::Start(args) => {
if let Err(e) = broker::start_broker_process(env, &args.start_timeout).await {
let storage_broker = StorageBroker::from_env(env);
if let Err(e) = storage_broker.start(&args.start_timeout).await {
eprintln!("broker start failed: {e}");
exit(1);
}
@@ -1785,7 +1788,8 @@ async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::Local
StorageBrokerCmd::Stop(_args) => {
// FIXME: stop_mode unused
if let Err(e) = broker::stop_broker_process(env) {
let storage_broker = StorageBroker::from_env(env);
if let Err(e) = storage_broker.stop() {
eprintln!("broker stop failed: {e}");
exit(1);
}
@@ -1835,8 +1839,11 @@ async fn handle_start_all_impl(
#[allow(clippy::redundant_closure_call)]
(|| {
js.spawn(async move {
let retry_timeout = retry_timeout;
broker::start_broker_process(env, &retry_timeout).await
let storage_broker = StorageBroker::from_env(env);
storage_broker
.start(&retry_timeout)
.await
.map_err(|e| e.context("start storage_broker"))
});
js.spawn(async move {
@@ -1991,7 +1998,8 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
}
}
if let Err(e) = broker::stop_broker_process(env) {
let storage_broker = StorageBroker::from_env(env);
if let Err(e) = storage_broker.stop() {
eprintln!("neon broker stop failed: {e:#}");
}

View File

@@ -3,60 +3,86 @@
//! In the local test environment, the storage broker stores its data directly in
//!
//! ```text
//! .neon
//! .neon/storage_broker
//! ```
use std::time::Duration;
use anyhow::Context;
use camino::Utf8PathBuf;
use crate::{background_process, local_env};
use crate::{background_process, local_env::LocalEnv};
pub async fn start_broker_process(
env: &local_env::LocalEnv,
retry_timeout: &Duration,
) -> anyhow::Result<()> {
let broker = &env.broker;
let listen_addr = &broker.listen_addr;
print!("Starting neon broker at {}", listen_addr);
let args = [format!("--listen-addr={listen_addr}")];
let client = reqwest::Client::new();
background_process::start_process(
"storage_broker",
&env.base_data_dir,
&env.storage_broker_bin(),
args,
[],
background_process::InitialPidFile::Create(storage_broker_pid_file_path(env)),
retry_timeout,
|| async {
let url = broker.client_url();
let status_url = url.join("status").with_context(|| {
format!("Failed to append /status path to broker endpoint {url}")
})?;
let request = client
.get(status_url)
.build()
.with_context(|| format!("Failed to construct request to broker endpoint {url}"))?;
match client.execute(request).await {
Ok(resp) => Ok(resp.status().is_success()),
Err(_) => Ok(false),
}
},
)
.await
.context("Failed to spawn storage_broker subprocess")?;
Ok(())
pub struct StorageBroker {
env: LocalEnv,
}
pub fn stop_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
background_process::stop_process(true, "storage_broker", &storage_broker_pid_file_path(env))
}
impl StorageBroker {
/// Create a new `StorageBroker` instance from the environment.
pub fn from_env(env: &LocalEnv) -> Self {
Self { env: env.clone() }
}
fn storage_broker_pid_file_path(env: &local_env::LocalEnv) -> Utf8PathBuf {
Utf8PathBuf::from_path_buf(env.base_data_dir.join("storage_broker.pid"))
.expect("non-Unicode path")
pub fn initialize(&self) -> anyhow::Result<()> {
if self.env.generate_local_ssl_certs {
self.env.generate_ssl_cert(
&self.env.storage_broker_data_dir().join("server.crt"),
&self.env.storage_broker_data_dir().join("server.key"),
)?;
}
Ok(())
}
/// Start the storage broker process.
pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
let broker = &self.env.broker;
print!("Starting neon broker at {}", broker.client_url());
let mut args = Vec::new();
if let Some(addr) = &broker.listen_addr {
args.push(format!("--listen-addr={addr}"));
}
if let Some(addr) = &broker.listen_https_addr {
args.push(format!("--listen-https-addr={addr}"));
}
let client = self.env.create_http_client();
background_process::start_process(
"storage_broker",
&self.env.storage_broker_data_dir(),
&self.env.storage_broker_bin(),
args,
[],
background_process::InitialPidFile::Create(self.pid_file_path()),
retry_timeout,
|| async {
let url = broker.client_url();
let status_url = url.join("status").with_context(|| {
format!("Failed to append /status path to broker endpoint {url}")
})?;
let request = client.get(status_url).build().with_context(|| {
format!("Failed to construct request to broker endpoint {url}")
})?;
match client.execute(request).await {
Ok(resp) => Ok(resp.status().is_success()),
Err(_) => Ok(false),
}
},
)
.await
.context("Failed to spawn storage_broker subprocess")?;
Ok(())
}
/// Stop the storage broker process.
pub fn stop(&self) -> anyhow::Result<()> {
background_process::stop_process(true, "storage_broker", &self.pid_file_path())
}
/// Get the path to the PID file for the storage broker.
fn pid_file_path(&self) -> Utf8PathBuf {
Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("storage_broker.pid"))
.expect("non-Unicode path")
}
}

View File

@@ -4,7 +4,7 @@
//! script which will use local paths.
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::time::Duration;
@@ -14,11 +14,12 @@ use anyhow::{Context, bail};
use clap::ValueEnum;
use pem::Pem;
use postgres_backend::AuthType;
use reqwest::Url;
use reqwest::{Certificate, Url};
use serde::{Deserialize, Serialize};
use utils::auth::encode_from_key_file;
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use crate::broker::StorageBroker;
use crate::endpoint_storage::{ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage};
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
use crate::safekeeper::SafekeeperNode;
@@ -157,11 +158,16 @@ pub struct EndpointStorageConf {
}
/// Broker config for cluster internal communication.
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Default)]
#[serde(default)]
pub struct NeonBroker {
/// Broker listen address for storage nodes coordination, e.g. '127.0.0.1:50051'.
pub listen_addr: SocketAddr,
/// Broker listen HTTP address for storage nodes coordination, e.g. '127.0.0.1:50051'.
/// At least one of listen_addr or listen_https_addr must be set.
pub listen_addr: Option<SocketAddr>,
/// Broker listen HTTPS address for storage nodes coordination, e.g. '127.0.0.1:50051'.
/// At least one of listen_addr or listen_https_addr must be set.
/// listen_https_addr is preferred over listen_addr in neon_local.
pub listen_https_addr: Option<SocketAddr>,
}
/// A part of storage controller's config the neon_local knows about.
@@ -235,18 +241,19 @@ impl Default for NeonStorageControllerConf {
}
}
// Dummy Default impl to satisfy Deserialize derive.
impl Default for NeonBroker {
fn default() -> Self {
NeonBroker {
listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
}
}
}
impl NeonBroker {
pub fn client_url(&self) -> Url {
Url::parse(&format!("http://{}", self.listen_addr)).expect("failed to construct url")
let url = if let Some(addr) = self.listen_https_addr {
format!("https://{}", addr)
} else {
format!(
"http://{}",
self.listen_addr
.expect("at least one address should be set")
)
};
Url::parse(&url).expect("failed to construct url")
}
}
@@ -441,6 +448,10 @@ impl LocalEnv {
self.base_data_dir.join("endpoints")
}
pub fn storage_broker_data_dir(&self) -> PathBuf {
self.base_data_dir.join("storage_broker")
}
pub fn pageserver_data_dir(&self, pageserver_id: NodeId) -> PathBuf {
self.base_data_dir
.join(format!("pageserver_{pageserver_id}"))
@@ -503,6 +514,23 @@ impl LocalEnv {
)
}
/// Creates HTTP client with local SSL CA certificates.
pub fn create_http_client(&self) -> reqwest::Client {
let ssl_ca_certs = self.ssl_ca_cert_path().map(|ssl_ca_file| {
let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist");
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
});
let mut http_client = reqwest::Client::builder();
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
http_client
.build()
.expect("HTTP client should construct with no error")
}
/// Inspect the base data directory and extract the instance id and instance directory path
/// for all storage controller instances
pub async fn storage_controller_instances(&self) -> std::io::Result<Vec<(u8, PathBuf)>> {
@@ -911,6 +939,12 @@ impl LocalEnv {
// create endpoints dir
fs::create_dir_all(env.endpoints_path())?;
// create storage broker dir
fs::create_dir_all(env.storage_broker_data_dir())?;
StorageBroker::from_env(&env)
.initialize()
.context("storage broker init failed")?;
// create safekeeper dirs
for safekeeper in &env.safekeepers {
fs::create_dir_all(SafekeeperNode::datadir_path_by_id(&env, safekeeper.id))?;

View File

@@ -21,7 +21,6 @@ use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
use postgres_backend::AuthType;
use postgres_connection::{PgConnectionConfig, parse_host_port};
use reqwest::Certificate;
use utils::auth::{Claims, Scope};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -51,19 +50,6 @@ impl PageServerNode {
parse_host_port(&conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let port = port.unwrap_or(5432);
let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| {
let buf = std::fs::read(ssl_ca_file).expect("SSL root CA file should exist");
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
});
let mut http_client = reqwest::Client::builder();
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client
.build()
.expect("Client constructs with no errors");
let endpoint = if env.storage_controller.use_https_pageserver_api {
format!(
"https://{}",
@@ -80,7 +66,7 @@ impl PageServerNode {
conf: conf.clone(),
env: env.clone(),
http_client: mgmt_api::Client::new(
http_client,
env.create_http_client(),
endpoint,
{
match conf.http_auth_type {

View File

@@ -87,7 +87,7 @@ impl SafekeeperNode {
conf: conf.clone(),
pg_connection_config: Self::safekeeper_connection_config(&listen_addr, conf.pg_port),
env: env.clone(),
http_client: reqwest::Client::new(),
http_client: env.create_http_client(),
http_base_url: format!("http://{}:{}/v1", listen_addr, conf.http_port),
listen_addr,
}

View File

@@ -20,7 +20,7 @@ use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use pem::Pem;
use postgres_backend::AuthType;
use reqwest::{Certificate, Method};
use reqwest::Method;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::process::Command;
@@ -153,24 +153,11 @@ impl StorageController {
}
};
let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| {
let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist");
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
});
let mut http_client = reqwest::Client::builder();
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client
.build()
.expect("HTTP client should construct with no error");
Self {
env: env.clone(),
private_key,
public_key,
client: http_client,
client: env.create_http_client(),
config: env.storage_controller.clone(),
listen_port: OnceLock::default(),
}

View File

@@ -13,7 +13,7 @@ For design details see [the RFC](./rfcs/021-metering.md) and [the discussion on
batch format is
```json
{ "events" : [metric1, metric2, ...]]}
{ "events" : [metric1, metric2, ...] }
```
See metric format examples below.
@@ -49,11 +49,13 @@ Size of the remote storage (S3) directory.
This is an absolute, per-tenant metric.
- `timeline_logical_size`
Logical size of the data in the timeline
Logical size of the data in the timeline.
This is an absolute, per-timeline metric.
- `synthetic_storage_size`
Size of all tenant's branches including WAL
Size of all tenant's branches including WAL.
This is the same metric that `tenant/{tenant_id}/size` endpoint returns.
This is an absolute, per-tenant metric.
@@ -106,10 +108,10 @@ This is an incremental, per-endpoint metric.
```
The metric is incremental, so the value is the difference between the current and the previous value.
If there is no previous value, the value, the value is the current value and the `start_time` equals `stop_time`.
If there is no previous value, the value is the current value and the `start_time` equals `stop_time`.
### TODO
- [ ] Handle errors better: currently if one tenant fails to gather metrics, the whole iteration fails and metrics are not sent for any tenant.
- [ ] Add retries
- [ ] Tune the interval
- [ ] Tune the interval

View File

@@ -169,6 +169,8 @@ pub struct TenantDescribeResponseShard {
pub is_pending_compute_notification: bool,
/// A shard split is currently underway
pub is_splitting: bool,
/// A timeline is being imported into this tenant
pub is_importing: bool,
pub scheduling_policy: ShardSchedulingPolicy,

View File

@@ -21,7 +21,7 @@ use pageserver_api::config::{
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
};
use pageserver_api::key::{Key, rel_block_to_key};
use pageserver_api::key::rel_block_to_key;
use pageserver_api::models::{
self, PageTraceEvent, PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
@@ -29,7 +29,7 @@ use pageserver_api::models::{
PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
PagestreamProtocolVersion, PagestreamRequest, TenantState,
};
use pageserver_api::reltag::{RelTag, SlruKind};
use pageserver_api::reltag::SlruKind;
use pageserver_api::shard::TenantShardId;
use postgres_backend::{
AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
@@ -1035,10 +1035,10 @@ impl PageServerHandler {
// avoid a somewhat costly Span::record() by constructing the entire span in one go.
macro_rules! mkspan {
(before shard routing) => {{
tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn, not_modified_since_lsn = %req.hdr.not_modified_since)
tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn)
}};
($shard_id:expr) => {{
tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn, not_modified_since_lsn = %req.hdr.not_modified_since, shard_id = %$shard_id)
tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn, shard_id = %$shard_id)
}};
}
@@ -1140,10 +1140,9 @@ impl PageServerHandler {
.await?;
// We're holding the Handle
let last_record_lsn = shard.get_last_record_lsn();
let effective_request_lsn = match Self::effective_request_lsn(
&shard,
last_record_lsn,
shard.get_last_record_lsn(),
req.hdr.request_lsn,
req.hdr.not_modified_since,
&shard.get_applied_gc_cutoff_lsn(),
@@ -1154,22 +1153,6 @@ impl PageServerHandler {
}
};
let trouble_key = Key::from_hex("000000067F000040000000400600FFFFFFFF").unwrap();
let trouble_rel = RelTag {
spcnode: trouble_key.field2,
dbnode: trouble_key.field3,
relnode: trouble_key.field4,
forknum: trouble_key.field5,
};
if req.rel == trouble_rel {
tracing::info!(
request_lsn=%req.hdr.request_lsn,
not_modified_since_lsn=%req.hdr.not_modified_since,
%last_record_lsn,
"effective_request_lsn for {} is {}", key, effective_request_lsn
);
}
BatchedFeMessage::GetPage {
span,
shard: shard.downgrade(),

View File

@@ -185,7 +185,6 @@ impl Timeline {
pending_directory_entries: Vec::new(),
pending_metadata_bytes: 0,
lsn,
extra_log: false,
}
}
@@ -266,14 +265,6 @@ impl Timeline {
let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
HashMap::with_capacity(pages.len());
let trouble_key = Key::from_hex("000000067F000040000000400600FFFFFFFF").unwrap();
let trouble_rel = RelTag {
spcnode: trouble_key.field2,
dbnode: trouble_key.field3,
relnode: trouble_key.field4,
forknum: trouble_key.field5,
};
for (response_slot_idx, (tag, blknum, lsn, ctx)) in pages.enumerate() {
if tag.relnode == 0 {
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
@@ -284,14 +275,6 @@ impl Timeline {
continue;
}
if *tag == trouble_rel {
tracing::info!(
"Getting rel size for {} at LSN {}",
rel_block_to_key(*tag, *blknum),
lsn
);
}
let nblocks = match self
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
@@ -1419,8 +1402,6 @@ pub struct DatadirModification<'a> {
/// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
pending_metadata_bytes: usize,
extra_log: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -1639,32 +1620,6 @@ impl DatadirModification<'_> {
) -> Result<(), WalIngestError> {
let mut gaps_at_lsns = Vec::default();
let trouble_key = Key::from_hex("000000067F000040000000400600FFFFFFFF").unwrap();
let trouble_rel = RelTag {
spcnode: trouble_key.field2,
dbnode: trouble_key.field3,
relnode: trouble_key.field4,
forknum: trouble_key.field5,
};
for meta in batch.metadata.iter().filter_map(|m| match m {
ValueMeta::Serialized(serialized_value_meta) => Some(serialized_value_meta),
ValueMeta::Observed(_) => None,
}) {
let key = Key::from_compact(meta.key);
let rel = RelTag {
spcnode: key.field2,
dbnode: key.field3,
relnode: key.field4,
forknum: key.field5,
};
if rel == trouble_rel {
tracing::info!("Put for {key} at LSN {}", meta.lsn);
self.extra_log = true;
}
}
for meta in batch.metadata.iter() {
let key = Key::from_compact(meta.key());
let (rel, blkno) = key
@@ -1995,19 +1950,6 @@ impl DatadirModification<'_> {
"invalid relnode"
)))?;
}
let trouble_key = Key::from_hex("000000067F000040000000400600FFFFFFFF").unwrap();
let trouble_rel = RelTag {
spcnode: trouble_key.field2,
dbnode: trouble_key.field3,
relnode: trouble_key.field4,
forknum: trouble_key.field5,
};
if rel == trouble_rel {
self.extra_log = true;
}
// It's possible that this is the first rel for this db in this
// tablespace. Create the reldir entry for it if so.
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
@@ -2027,10 +1969,6 @@ impl DatadirModification<'_> {
true
};
if rel == trouble_rel {
tracing::info!(%dbdir_exists, "Maybe created db dir for {} at LSN {}", trouble_key.to_compact(), self.lsn);
}
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let mut rel_dir = if !dbdir_exists {
// Create the RelDirectory
@@ -2076,10 +2014,6 @@ impl DatadirModification<'_> {
}
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
if rel == trouble_rel {
tracing::info!(%dbdir_exists, "Created v2 rel for {} at LSN {}", trouble_key.to_compact(), self.lsn);
}
} else {
// Add the new relation to the rel directory entry, and write it back
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
@@ -2095,10 +2029,6 @@ impl DatadirModification<'_> {
rel_dir_key,
Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
);
if rel == trouble_rel {
tracing::info!(%dbdir_exists, "Created v1 rel for {} at LSN {}", trouble_key.to_compact(), self.lsn);
}
}
// Put size
@@ -2533,19 +2463,11 @@ impl DatadirModification<'_> {
};
if let Some(batch) = maybe_batch {
if self.extra_log {
tracing::info!(
"Flushing batch with max_lsn={}. Last record LSN is {}",
batch.max_lsn,
self.tline.get_last_record_lsn()
);
} else {
tracing::debug!(
"Flushing batch with max_lsn={}. Last record LSN is {}",
batch.max_lsn,
self.tline.get_last_record_lsn()
);
}
tracing::debug!(
"Flushing batch with max_lsn={}. Last record LSN is {}",
batch.max_lsn,
self.tline.get_last_record_lsn()
);
// This bails out on first error without modifying pending_updates.
// That's Ok, cf this function's doc comment.
@@ -2579,14 +2501,6 @@ impl DatadirModification<'_> {
self.pending_metadata_bytes = 0;
if self.extra_log {
tracing::info!(
"Flushed batch. Last record LSN is {}",
self.tline.get_last_record_lsn()
);
self.extra_log = false;
}
Ok(())
}
@@ -2677,26 +2591,6 @@ impl DatadirModification<'_> {
.pending_data_batch
.get_or_insert_with(SerializedValueBatch::default);
batch.put(key, val, self.lsn);
let trouble_key = Key::from_hex("000000067F000040000000400600FFFFFFFF").unwrap();
let trouble_rel = RelTag {
spcnode: trouble_key.field2,
dbnode: trouble_key.field3,
relnode: trouble_key.field4,
forknum: trouble_key.field5,
};
let key = Key::from_compact(key);
let rel = RelTag {
spcnode: key.field2,
dbnode: key.field3,
relnode: key.field4,
forknum: key.field5,
};
if rel == trouble_rel {
tracing::info!("Put for {key} at LSN {}", self.lsn);
self.extra_log = true;
}
}
fn put_metadata(&mut self, key: CompactKey, val: Value) {
@@ -2723,14 +2617,6 @@ impl DatadirModification<'_> {
if key == CHECKPOINT_KEY.to_compact() {
tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
}
let trouble_key = Key::from_hex("000000067F000040000000400600FFFFFFFF")
.unwrap()
.to_compact();
if key == trouble_key {
tracing::info!("Put for {trouble_key} at LSN {}", self.lsn);
self.extra_log = true;
}
}
fn delete(&mut self, key_range: Range<Key>) {

View File

@@ -3816,6 +3816,24 @@ impl TenantShard {
MaybeDeletedIndexPart::IndexPart(p) => p,
};
// A shard split may not take place while a timeline import is on-going
// for the tenant. Timeline imports run as part of each tenant shard
// and rely on the sharding scheme to split the work among pageservers.
// If we were to split in the middle of this process, we would have to
// either ensure that it's driven to completion on the old shard set
// or transfer it to the new shard set. It's technically possible, but complex.
match index_part.import_pgdata {
Some(ref import) if !import.is_done() => {
anyhow::bail!(
"Cannot split due to import with idempotency key: {:?}",
import.idempotency_key()
);
}
Some(_) | None => {
// fallthrough
}
}
for child_shard in child_shards {
tracing::info!(%timeline_id, "Uploading index_part for child {}", child_shard.to_index());
upload_index_part(

View File

@@ -36,6 +36,8 @@ DATA = \
neon--1.2--1.3.sql \
neon--1.3--1.4.sql \
neon--1.4--1.5.sql \
neon--1.5--1.6.sql \
neon--1.6--1.5.sql \
neon--1.5--1.4.sql \
neon--1.4--1.3.sql \
neon--1.3--1.2.sql \

View File

@@ -88,9 +88,6 @@ typedef PGAlignedBlock PGIOAlignedBlock;
page_server_api *page_server;
static uint32 local_request_counter;
#define GENERATE_REQUEST_ID() (((NeonRequestId)MyProcPid << 32) | ++local_request_counter)
/*
* Various settings related to prompt (fast) handling of PageStream responses
* at any CHECK_FOR_INTERRUPTS point.
@@ -788,6 +785,27 @@ prefetch_read(PrefetchRequest *slot)
}
}
/*
* Wait completion of previosly registered prefetch request.
* Prefetch result should be placed in LFC by prefetch_wait_for.
*/
bool
communicator_prefetch_receive(BufferTag tag)
{
PrfHashEntry *entry;
PrefetchRequest hashkey;
hashkey.buftag = tag;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
if (entry != NULL && prefetch_wait_for(entry->slot->my_ring_index))
{
prefetch_set_unused(entry->slot->my_ring_index);
return true;
}
return false;
}
/*
* Disconnect hook - drop prefetches when the connection drops
*
@@ -906,7 +924,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
NeonGetPageRequest request = {
.hdr.tag = T_NeonGetPageRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
/* lsn and not_modified_since are filled in below */
.rinfo = BufTagGetNRelFileInfo(slot->buftag),
.forknum = slot->buftag.forkNum,
@@ -915,8 +932,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(mySlotNo == MyPState->ring_unused);
slot->reqid = request.hdr.reqid;
if (force_request_lsns)
slot->request_lsns = *force_request_lsns;
else
@@ -934,6 +949,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(mySlotNo == MyPState->ring_unused);
/* loop */
}
slot->reqid = request.hdr.reqid;
/* update prefetch state */
MyPState->n_requests_inflight += 1;
@@ -1937,7 +1953,6 @@ communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum, neon_request_lsns *r
{
NeonExistsRequest request = {
.hdr.tag = T_NeonExistsRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.rinfo = rinfo,
@@ -2212,7 +2227,6 @@ communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum, neon_request_lsns *
{
NeonNblocksRequest request = {
.hdr.tag = T_NeonNblocksRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.rinfo = rinfo,
@@ -2285,7 +2299,6 @@ communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns)
{
NeonDbSizeRequest request = {
.hdr.tag = T_NeonDbSizeRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.dbNode = dbNode,
@@ -2353,7 +2366,6 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
request = (NeonGetSlruSegmentRequest) {
.hdr.tag = T_NeonGetSlruSegmentRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.kind = kind,

View File

@@ -37,6 +37,8 @@ extern int communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum,
BlockNumber nblocks, void **buffers, bits8 *mask);
extern void communicator_prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
BlockNumber nblocks, const bits8 *mask);
extern bool communicator_prefetch_receive(BufferTag tag);
extern int communicator_read_slru_segment(SlruKind kind, int64 segno,
neon_request_lsns *request_lsns,
void *buffer);

View File

@@ -25,6 +25,7 @@
#include "pgstat.h"
#include "port/pg_iovec.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include RELFILEINFO_HDR
#include "storage/buf_internals.h"
#include "storage/fd.h"
@@ -32,6 +33,8 @@
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/pg_shmem.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/dynahash.h"
#include "utils/guc.h"
@@ -46,6 +49,8 @@
#include "neon.h"
#include "neon_lwlsncache.h"
#include "neon_perf_counters.h"
#include "pagestore_client.h"
#include "communicator.h"
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
@@ -87,14 +92,15 @@
* 1Mb chunks can reduce hash map size to 320Mb.
* 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed
*/
#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */
/*
* Smaller chunk seems to be better for OLTP workload
*/
// #define BLOCKS_PER_CHUNK 8 /* 64kb chunk */
#define MAX_BLOCKS_PER_CHUNK_LOG 7 /* 1Mb chunk */
#define MAX_BLOCKS_PER_CHUNK (1 << MAX_BLOCKS_PER_CHUNK_LOG)
#define MB ((uint64)1024*1024)
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ >> lfc_chunk_size_log))
#define BLOCK_TO_CHUNK_NUN(blkno) ((blkno) >> lfc_chunk_size_log)
#define BLOCK_TO_CHUNK_OFF(blkno) ((blkno) & (lfc_blocks_per_chunk-1))
/*
* Blocks are read or written to LFC file outside LFC critical section.
@@ -119,16 +125,26 @@ typedef struct FileCacheEntry
uint32 hash;
uint32 offset;
uint32 access_count;
uint32 state[(BLOCKS_PER_CHUNK + 31) / 32 * 2]; /* two bits per block */
dlist_node list_node; /* LRU/holes list node */
uint32 state[FLEXIBLE_ARRAY_MEMBER]; /* two bits per block */
} FileCacheEntry;
#define FILE_CACHE_ENRTY_SIZE MAXALIGN(offsetof(FileCacheEntry, state) + (lfc_blocks_per_chunk*2+31)/32*4)
#define GET_STATE(entry, i) (((entry)->state[(i) / 16] >> ((i) % 16 * 2)) & 3)
#define SET_STATE(entry, i, new_state) (entry)->state[(i) / 16] = ((entry)->state[(i) / 16] & ~(3 << ((i) % 16 * 2))) | ((new_state) << ((i) % 16 * 2))
#define N_COND_VARS 64
#define CV_WAIT_TIMEOUT 10
#define MAX_PREWARM_WORKERS 8
typedef struct PrewarmWorkerState
{
uint32 prewarmed_pages;
uint32 skipped_pages;
TimestampTz completed;
} PrewarmWorkerState;
typedef struct FileCacheControl
{
uint64 generation; /* generation is needed to handle correct hash
@@ -136,6 +152,7 @@ typedef struct FileCacheControl
uint32 size; /* size of cache file in chunks */
uint32 used; /* number of used chunks */
uint32 used_pages; /* number of used pages */
uint32 pinned; /* number of pinned chunks */
uint32 limit; /* shared copy of lfc_size_limit */
uint64 hits;
uint64 misses;
@@ -149,23 +166,54 @@ typedef struct FileCacheControl
dlist_head holes; /* double linked list of punched holes */
HyperLogLogState wss_estimation; /* estimation of working set size */
ConditionVariable cv[N_COND_VARS]; /* turnstile of condition variables */
PrewarmWorkerState prewarm_workers[MAX_PREWARM_WORKERS];
size_t n_prewarm_workers;
size_t n_prewarm_entries;
size_t total_prewarm_pages;
size_t prewarm_batch;
bool prewarm_active;
bool prewarm_canceled;
dsm_handle prewarm_lfc_state_handle;
} FileCacheControl;
bool lfc_store_prefetch_result;
#define FILE_CACHE_STATE_MAGIC 0xfcfcfcfc
static HTAB *lfc_hash;
typedef struct FileCacheState
{
int32 vl_len_; /* varlena header (do not touch directly!) */
uint32 magic;
uint32 n_chunks;
uint32 n_pages;
uint16 chunk_size_log;
BufferTag chunks[FLEXIBLE_ARRAY_MEMBER];
/* followed by bitmap */
} FileCacheState;
#define FILE_CACHE_STATE_BITMAP(fcs) ((uint8*)&(fcs)->chunks[(fcs)->n_chunks])
#define FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_chunks) (sizeof(FileCacheState) + (n_chunks)*sizeof(BufferTag) + (((n_chunks) * lfc_blocks_per_chunk)+7)/8)
#define FILE_CACHE_STATE_SIZE(fcs) (sizeof(FileCacheState) + (fcs->n_chunks)*sizeof(BufferTag) + (((fcs->n_chunks) << fcs->chunk_size_log)+7)/8)
static HTAB *lfc_hash;
static int lfc_desc = -1;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static int lfc_prewarm_limit;
static int lfc_prewarm_batch;
static int lfc_chunk_size_log = MAX_BLOCKS_PER_CHUNK_LOG;
static int lfc_blocks_per_chunk = MAX_BLOCKS_PER_CHUNK;
static char *lfc_path;
static uint64 lfc_generation;
static FileCacheControl *lfc_ctl;
static bool lfc_do_prewarm;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
bool lfc_store_prefetch_result;
bool lfc_prewarm_update_ws_estimation;
#define LFC_ENABLED() (lfc_ctl->limit != 0)
/*
@@ -206,7 +254,9 @@ lfc_switch_off(void)
}
lfc_ctl->generation += 1;
lfc_ctl->size = 0;
lfc_ctl->pinned = 0;
lfc_ctl->used = 0;
lfc_ctl->used_pages = 0;
lfc_ctl->limit = 0;
dlist_init(&lfc_ctl->lru);
dlist_init(&lfc_ctl->holes);
@@ -296,7 +346,7 @@ lfc_shmem_startup(void)
lfc_lock = (LWLockId) GetNamedLWLockTranche("lfc_lock");
info.keysize = sizeof(BufferTag);
info.entrysize = sizeof(FileCacheEntry);
info.entrysize = FILE_CACHE_ENRTY_SIZE;
/*
* n_chunks+1 because we add new element to hash table before eviction
@@ -342,7 +392,7 @@ lfc_shmem_request(void)
prev_shmem_request_hook();
#endif
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, sizeof(FileCacheEntry)));
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, FILE_CACHE_ENRTY_SIZE));
RequestNamedLWLockTranche("lfc_lock", 1);
}
@@ -359,6 +409,24 @@ is_normal_backend(void)
return lfc_ctl && MyProc && UsedShmemSegAddr && !IsParallelWorker();
}
static bool
lfc_check_chunk_size(int *newval, void **extra, GucSource source)
{
if (*newval & (*newval - 1))
{
elog(ERROR, "LFC chunk size should be pwer of two");
return false;
}
return true;
}
static void
lfc_change_chunk_size(int newval, void* extra)
{
lfc_chunk_size_log = pg_ceil_log2_32(newval);
}
static bool
lfc_check_limit_hook(int *newval, void **extra, GucSource source)
{
@@ -415,11 +483,11 @@ lfc_change_limit_hook(int newval, void *extra)
CriticalAssert(victim->access_count == 0);
#ifdef FALLOC_FL_PUNCH_HOLE
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * BLOCKS_PER_CHUNK * BLCKSZ, BLOCKS_PER_CHUNK * BLCKSZ) < 0)
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * lfc_blocks_per_chunk * BLCKSZ, lfc_blocks_per_chunk * BLCKSZ) < 0)
neon_log(LOG, "Failed to punch hole in file: %m");
#endif
/* We remove the old entry, and re-enter a hole to the hash table */
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
for (int i = 0; i < lfc_blocks_per_chunk; i++)
{
bool is_page_cached = GET_STATE(victim, i) == AVAILABLE;
lfc_ctl->used_pages -= is_page_cached;
@@ -471,6 +539,17 @@ lfc_init(void)
NULL,
NULL);
DefineCustomBoolVariable("neon.prewarm_update_ws_estimation",
"Consider prewarmed pages for working set estimation",
NULL,
&lfc_prewarm_update_ws_estimation,
true,
PGC_SUSET,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.max_file_cache_size",
"Maximal size of Neon local file cache",
NULL,
@@ -508,6 +587,45 @@ lfc_init(void)
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_limit",
"Maximal number of prewarmed chunks",
NULL,
&lfc_prewarm_limit,
INT_MAX, /* no limit by default */
0,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_batch",
"Number of pages retrivied by prewarm from page server",
NULL,
&lfc_prewarm_batch,
64,
1,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.chunk_size",
"LFC chunk size in blocks (should be power of two)",
NULL,
&lfc_blocks_per_chunk,
MAX_BLOCKS_PER_CHUNK,
1,
MAX_BLOCKS_PER_CHUNK,
PGC_POSTMASTER,
GUC_UNIT_BLOCKS,
lfc_check_chunk_size,
lfc_change_chunk_size,
NULL);
if (lfc_max_size == 0)
return;
@@ -521,6 +639,311 @@ lfc_init(void)
#endif
}
static FileCacheState*
lfc_get_state(size_t max_entries)
{
FileCacheState* fcs = NULL;
if (lfc_maybe_disabled() || max_entries == 0) /* fast exit if file cache is disabled */
return NULL;
LWLockAcquire(lfc_lock, LW_SHARED);
if (LFC_ENABLED())
{
dlist_iter iter;
size_t i = 0;
uint8* bitmap;
size_t n_pages = 0;
size_t n_entries = Min(max_entries, lfc_ctl->used - lfc_ctl->pinned);
size_t state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries);
fcs = (FileCacheState*)palloc0(state_size);
SET_VARSIZE(fcs, state_size);
fcs->magic = FILE_CACHE_STATE_MAGIC;
fcs->chunk_size_log = lfc_chunk_size_log;
fcs->n_chunks = n_entries;
bitmap = FILE_CACHE_STATE_BITMAP(fcs);
dlist_reverse_foreach(iter, &lfc_ctl->lru)
{
FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur);
fcs->chunks[i] = entry->key;
for (int j = 0; j < lfc_blocks_per_chunk; j++)
{
if (GET_STATE(entry, j) != UNAVAILABLE)
{
BITMAP_SET(bitmap, i*lfc_blocks_per_chunk + j);
n_pages += 1;
}
}
if (++i == n_entries)
break;
}
Assert(i == n_entries);
fcs->n_pages = n_pages;
Assert(pg_popcount((char*)bitmap, ((n_entries << lfc_chunk_size_log) + 7)/8) == n_pages);
elog(LOG, "LFC: save state of %d chunks %d pages", (int)n_entries, (int)n_pages);
}
LWLockRelease(lfc_lock);
return fcs;
}
/*
* Prewarm LFC cache to the specified state. It uses lfc_prefetch function to load prewarmed page without hoilding shared buffer lock
* and avoid race conditions with other backends.
*/
static void
lfc_prewarm(FileCacheState* fcs, uint32 n_workers)
{
size_t fcs_chunk_size_log;
size_t n_entries;
size_t prewarm_batch = Min(lfc_prewarm_batch, readahead_buffer_size);
size_t fcs_size;
dsm_segment *seg;
BackgroundWorkerHandle* bgw_handle[MAX_PREWARM_WORKERS];
if (!lfc_ensure_opened())
return;
if (prewarm_batch == 0 || lfc_prewarm_limit == 0 || n_workers == 0)
{
elog(LOG, "LFC: prewarm is disabled");
return;
}
if (n_workers > MAX_PREWARM_WORKERS)
{
elog(ERROR, "LFC: Too much prewarm workers, maximum is %d", MAX_PREWARM_WORKERS);
}
if (fcs == NULL || fcs->n_chunks == 0)
{
elog(LOG, "LFC: nothing to prewarm");
return;
}
if (fcs->magic != FILE_CACHE_STATE_MAGIC)
{
elog(ERROR, "LFC: Invalid file cache state magic: %X", fcs->magic);
}
fcs_size = VARSIZE(fcs);
if (FILE_CACHE_STATE_SIZE(fcs) != fcs_size)
{
elog(ERROR, "LFC: Invalid file cache state size: %u vs. %u", (unsigned)FILE_CACHE_STATE_SIZE(fcs), VARSIZE(fcs));
}
fcs_chunk_size_log = fcs->chunk_size_log;
if (fcs_chunk_size_log > MAX_BLOCKS_PER_CHUNK_LOG)
{
elog(ERROR, "LFC: Invalid chunk size log: %u", fcs->chunk_size_log);
}
n_entries = Min(fcs->n_chunks, lfc_prewarm_limit);
Assert(n_entries != 0);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
/* Do not prewarm more entries than LFC limit */
if (lfc_ctl->limit <= lfc_ctl->size)
{
elog(LOG, "LFC: skip prewarm because LFC is already filled");
LWLockRelease(lfc_lock);
return;
}
if (lfc_ctl->prewarm_active)
{
LWLockRelease(lfc_lock);
elog(ERROR, "LFC: skip prewarm because another prewarm is still active");
}
lfc_ctl->n_prewarm_entries = n_entries;
lfc_ctl->n_prewarm_workers = n_workers;
lfc_ctl->prewarm_active = true;
lfc_ctl->prewarm_canceled = false;
lfc_ctl->prewarm_batch = prewarm_batch;
memset(lfc_ctl->prewarm_workers, 0, n_workers*sizeof(PrewarmWorkerState));
LWLockRelease(lfc_lock);
/* Calculate total number of pages to be prewarmed */
lfc_ctl->total_prewarm_pages = fcs->n_pages;
seg = dsm_create(fcs_size, 0);
memcpy(dsm_segment_address(seg), fcs, fcs_size);
lfc_ctl->prewarm_lfc_state_handle = dsm_segment_handle(seg);
/* Spawn background workers */
for (uint32 i = 0; i < n_workers; i++)
{
BackgroundWorker worker = {0};
worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
worker.bgw_restart_time = BGW_NEVER_RESTART;
strcpy(worker.bgw_library_name, "neon");
strcpy(worker.bgw_function_name, "lfc_prewarm_main");
snprintf(worker.bgw_name, BGW_MAXLEN, "LFC prewarm worker %d", i+1);
strcpy(worker.bgw_type, "LFC prewarm worker");
worker.bgw_main_arg = Int32GetDatum(i);
/* must set notify PID to wait for shutdown */
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &bgw_handle[i]))
{
ereport(LOG,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("LFC: registering dynamic bgworker prewarm failed"),
errhint("Consider increasing the configuration parameter \"%s\".", "max_worker_processes")));
n_workers = i;
lfc_ctl->prewarm_canceled = true;
break;
}
}
for (uint32 i = 0; i < n_workers; i++)
{
while (true)
{
PG_TRY();
{
BgwHandleStatus status = WaitForBackgroundWorkerShutdown(bgw_handle[i]);
if (status != BGWH_STOPPED && status != BGWH_POSTMASTER_DIED)
{
elog(LOG, "LFC: Unexpected status of prewarm worker termination: %d", status);
}
break;
}
PG_CATCH();
{
elog(LOG, "LFC: cancel prewarm");
lfc_ctl->prewarm_canceled = true;
}
PG_END_TRY();
}
if (!lfc_ctl->prewarm_workers[i].completed)
{
/* Background worker doesn't set completion time: it means that it was abnormally terminated */
elog(LOG, "LFC: prewarm worker %d failed", i+1);
/* Set completion time to prevent get_prewarm_info from considering this worker as active */
lfc_ctl->prewarm_workers[i].completed = GetCurrentTimestamp();
}
}
dsm_detach(seg);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
lfc_ctl->prewarm_active = false;
LWLockRelease(lfc_lock);
}
void
lfc_prewarm_main(Datum main_arg)
{
size_t snd_idx = 0, rcv_idx = 0;
size_t n_sent = 0, n_received = 0;
size_t fcs_chunk_size_log;
size_t max_prefetch_pages;
size_t prewarm_batch;
size_t n_workers;
dsm_segment *seg;
FileCacheState* fcs;
uint8* bitmap;
BufferTag tag;
PrewarmWorkerState* ws;
uint32 worker_id = DatumGetInt32(main_arg);
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
seg = dsm_attach(lfc_ctl->prewarm_lfc_state_handle);
if (seg == NULL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not map dynamic shared memory segment")));
fcs = (FileCacheState*) dsm_segment_address(seg);
prewarm_batch = lfc_ctl->prewarm_batch;
fcs_chunk_size_log = fcs->chunk_size_log;
n_workers = lfc_ctl->n_prewarm_workers;
max_prefetch_pages = lfc_ctl->n_prewarm_entries << fcs_chunk_size_log;
ws = &lfc_ctl->prewarm_workers[worker_id];
bitmap = FILE_CACHE_STATE_BITMAP(fcs);
/* enable prefetch in LFC */
lfc_store_prefetch_result = true;
lfc_do_prewarm = true; /* Flag for lfc_prefetch preventing replacement of existed entries if LFC cache is full */
elog(LOG, "LFC: worker %d start prewarming", worker_id);
while (true)
{
if (snd_idx < max_prefetch_pages && !lfc_ctl->prewarm_canceled)
{
if ((snd_idx >> fcs_chunk_size_log) % n_workers != worker_id)
{
/* If there are multiple workers, split chunks between them */
snd_idx += 1 << fcs_chunk_size_log;
}
else
{
if (BITMAP_ISSET(bitmap, snd_idx))
{
tag = fcs->chunks[snd_idx >> fcs_chunk_size_log];
tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1);
if (!lfc_cache_contains(BufTagGetNRelFileInfo(tag), tag.forkNum, tag.blockNum))
{
(void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL);
n_sent += 1;
}
else
{
ws->skipped_pages += 1;
BITMAP_CLR(bitmap, snd_idx);
}
}
snd_idx += 1;
}
}
if (n_sent >= n_received + prewarm_batch || snd_idx == max_prefetch_pages || lfc_ctl->prewarm_canceled)
{
if (n_received == n_sent && (snd_idx == max_prefetch_pages || lfc_ctl->prewarm_canceled))
{
break;
}
if ((rcv_idx >> fcs_chunk_size_log) % n_workers != worker_id)
{
/* Skip chunks processed by other workers */
rcv_idx += 1 << fcs_chunk_size_log;
continue;
}
/* Locate next block to prefetch */
while (!BITMAP_ISSET(bitmap, rcv_idx))
{
rcv_idx += 1;
}
tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log];
tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1);
if (communicator_prefetch_receive(tag))
{
ws->prewarmed_pages += 1;
}
else
{
ws->skipped_pages += 1;
}
rcv_idx += 1;
n_received += 1;
}
}
Assert(n_sent == n_received);
elog(LOG, "LFC: worker %d complete prewarming: loaded %ld pages", worker_id, (long)n_received);
lfc_ctl->prewarm_workers[worker_id].completed = GetCurrentTimestamp();
}
/*
* Check if page is present in the cache.
* Returns true if page is found in local cache.
@@ -530,7 +953,7 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
{
BufferTag tag;
FileCacheEntry *entry;
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
bool found = false;
uint32 hash;
@@ -539,7 +962,7 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forkNum;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
tag.blockNum = blkno - chunk_offs;
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
hash = get_hash_value(lfc_hash, &tag);
@@ -577,9 +1000,9 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
tag.blockNum = blkno - chunk_offs;
hash = get_hash_value(lfc_hash, &tag);
chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
LWLockAcquire(lfc_lock, LW_SHARED);
@@ -590,12 +1013,12 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
while (true)
{
int this_chunk = Min(nblocks - i, BLOCKS_PER_CHUNK - chunk_offs);
int this_chunk = Min(nblocks - i, lfc_blocks_per_chunk - chunk_offs);
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
if (entry != NULL)
{
for (; chunk_offs < BLOCKS_PER_CHUNK && i < nblocks; chunk_offs++, i++)
for (; chunk_offs < lfc_blocks_per_chunk && i < nblocks; chunk_offs++, i++)
{
if (GET_STATE(entry, chunk_offs) != UNAVAILABLE)
{
@@ -619,9 +1042,9 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
* Prepare for the next iteration. We don't unlock here, as that'd
* probably be more expensive than the gains it'd get us.
*/
tag.blockNum = (blkno + i) & ~(BLOCKS_PER_CHUNK - 1);
chunk_offs = BLOCK_TO_CHUNK_OFF(blkno + i);
tag.blockNum = (blkno + i) - chunk_offs;
hash = get_hash_value(lfc_hash, &tag);
chunk_offs = (blkno + i) & (BLOCKS_PER_CHUNK - 1);
}
LWLockRelease(lfc_lock);
@@ -696,9 +1119,9 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
while (nblocks > 0)
{
struct iovec iov[PG_IOV_MAX];
int8 chunk_mask[BLOCKS_PER_CHUNK / 8] = {0};
int chunk_offs = (blkno & (BLOCKS_PER_CHUNK - 1));
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
uint8 chunk_mask[MAX_BLOCKS_PER_CHUNK / 8] = {0};
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
int blocks_in_chunk = Min(nblocks, lfc_blocks_per_chunk - chunk_offs);
int iteration_hits = 0;
int iteration_misses = 0;
uint64 io_time_us = 0;
@@ -786,8 +1209,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
/* Unlink entry from LRU list to pin it for the duration of IO operation */
if (entry->access_count++ == 0)
{
lfc_ctl->pinned += 1;
dlist_delete(&entry->list_node);
}
generation = lfc_ctl->generation;
entry_offset = entry->offset;
@@ -836,7 +1261,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (iteration_hits != 0)
{
/* chunk offset (# of pages) into the LFC file */
off_t first_read_offset = (off_t) entry_offset * BLOCKS_PER_CHUNK;
off_t first_read_offset = (off_t) entry_offset * lfc_blocks_per_chunk;
int nwrite = iov_last_used - first_block_in_chunk_read;
/* offset of first IOV */
first_read_offset += chunk_offs + first_block_in_chunk_read;
@@ -884,7 +1309,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CriticalAssert(entry->access_count > 0);
if (--entry->access_count == 0)
{
lfc_ctl->pinned -= 1;
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
}
}
else
{
@@ -954,14 +1382,17 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
* If we can't (e.g. because all other slots are being accessed)
* then we will remove this entry from the hash and continue
* on to the next chunk, as we may not exceed the limit.
*
* While prewarming LFC we do not want to replcate existed entries,
* so we just stop prewarm is LFC cache is full.
*/
else if (!dlist_is_empty(&lfc_ctl->lru))
else if (!dlist_is_empty(&lfc_ctl->lru) && !lfc_do_prewarm)
{
/* Cache overflow: evict least recently used chunk */
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node,
dlist_pop_head_node(&lfc_ctl->lru));
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
for (int i = 0; i < lfc_blocks_per_chunk; i++)
{
bool is_page_cached = GET_STATE(victim, i) == AVAILABLE;
lfc_ctl->used_pages -= is_page_cached;
@@ -979,14 +1410,15 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
/* Can't add this chunk - we don't have the space for it */
hash_search_with_hash_value(lfc_hash, &entry->key, hash,
HASH_REMOVE, NULL);
lfc_ctl->prewarm_canceled = true; /* cancel prewarm if LFC limit is reached */
return false;
}
entry->access_count = 1;
entry->hash = hash;
lfc_ctl->pinned += 1;
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
for (int i = 0; i < lfc_blocks_per_chunk; i++)
SET_STATE(entry, i, UNAVAILABLE);
return true;
@@ -1031,7 +1463,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
FileCacheBlockState state;
XLogRecPtr lwlsn;
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return false;
@@ -1041,7 +1473,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
tag.blockNum = blkno - chunk_offs;
hash = get_hash_value(lfc_hash, &tag);
cv = &lfc_ctl->cv[hash % N_COND_VARS];
@@ -1052,7 +1484,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
LWLockRelease(lfc_lock);
return false;
}
lwlsn = neon_get_lwlsn(rinfo, forknum, blkno);
if (lwlsn > lsn)
@@ -1065,9 +1497,11 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
tag.blockNum = blkno;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
if (lfc_prewarm_update_ws_estimation)
{
tag.blockNum = blkno;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
if (found)
{
state = GET_STATE(entry, chunk_offs);
@@ -1081,7 +1515,10 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
* operation
*/
if (entry->access_count++ == 0)
{
lfc_ctl->pinned += 1;
dlist_delete(&entry->list_node);
}
}
else
{
@@ -1106,7 +1543,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
INSTR_TIME_SET_CURRENT(io_start);
rc = pwrite(lfc_desc, buffer, BLCKSZ,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
INSTR_TIME_SET_CURRENT(io_end);
pgstat_report_wait_end();
@@ -1132,7 +1569,10 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
inc_page_cache_write_wait(time_spent_us);
if (--entry->access_count == 0)
{
lfc_ctl->pinned -= 1;
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
}
state = GET_STATE(entry, chunk_offs);
if (state == REQUESTED) {
@@ -1199,8 +1639,8 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
while (nblocks > 0)
{
struct iovec iov[PG_IOV_MAX];
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
int blocks_in_chunk = Min(nblocks, lfc_blocks_per_chunk - chunk_offs);
instr_time io_start, io_end;
ConditionVariable* cv;
@@ -1212,7 +1652,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
iov[i].iov_len = BLCKSZ;
}
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
tag.blockNum = blkno - chunk_offs;
hash = get_hash_value(lfc_hash, &tag);
cv = &lfc_ctl->cv[hash % N_COND_VARS];
@@ -1232,7 +1672,10 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
* operation
*/
if (entry->access_count++ == 0)
{
lfc_ctl->pinned += 1;
dlist_delete(&entry->list_node);
}
}
else
{
@@ -1285,7 +1728,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
INSTR_TIME_SET_CURRENT(io_start);
rc = pwritev(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
INSTR_TIME_SET_CURRENT(io_end);
pgstat_report_wait_end();
@@ -1312,7 +1755,10 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
inc_page_cache_write_wait(time_spent_us);
if (--entry->access_count == 0)
{
lfc_ctl->pinned -= 1;
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
}
for (int i = 0; i < blocks_in_chunk; i++)
{
@@ -1438,7 +1884,12 @@ neon_get_lfc_stats(PG_FUNCTION_ARGS)
break;
case 8:
key = "file_cache_chunk_size_pages";
value = BLOCKS_PER_CHUNK;
value = lfc_blocks_per_chunk;
break;
case 9:
key = "file_cache_chunks_pinned";
if (lfc_ctl)
value = lfc_ctl->pinned;
break;
default:
SRF_RETURN_DONE(funcctx);
@@ -1566,7 +2017,7 @@ local_cache_pages(PG_FUNCTION_ARGS)
/* Skip hole tags */
if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
for (int i = 0; i < lfc_blocks_per_chunk; i++)
n_pages += GET_STATE(entry, i) == AVAILABLE;
}
}
@@ -1594,13 +2045,13 @@ local_cache_pages(PG_FUNCTION_ARGS)
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
for (int i = 0; i < lfc_blocks_per_chunk; i++)
{
if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0)
{
if (GET_STATE(entry, i) == AVAILABLE)
{
fctx->record[n].pageoffs = entry->offset * BLOCKS_PER_CHUNK + i;
fctx->record[n].pageoffs = entry->offset * lfc_blocks_per_chunk + i;
fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
@@ -1684,3 +2135,82 @@ approximate_working_set_size(PG_FUNCTION_ARGS)
}
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(get_local_cache_state);
Datum
get_local_cache_state(PG_FUNCTION_ARGS)
{
size_t max_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0);
FileCacheState* fcs = lfc_get_state(max_entries);
if (fcs != NULL)
PG_RETURN_BYTEA_P((bytea*)fcs);
else
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(prewarm_local_cache);
Datum
prewarm_local_cache(PG_FUNCTION_ARGS)
{
bytea* state = PG_GETARG_BYTEA_PP(0);
uint32 n_workers = PG_GETARG_INT32(1);
FileCacheState* fcs = (FileCacheState*)state;
lfc_prewarm(fcs, n_workers);
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(get_prewarm_info);
Datum
get_prewarm_info(PG_FUNCTION_ARGS)
{
Datum values[4];
bool nulls[4];
TupleDesc tupdesc;
uint32 prewarmed_pages = 0;
uint32 skipped_pages = 0;
uint32 active_workers = 0;
uint32 total_pages;
size_t n_workers;
if (lfc_size_limit == 0)
PG_RETURN_NULL();
LWLockAcquire(lfc_lock, LW_SHARED);
if (!lfc_ctl || lfc_ctl->n_prewarm_workers == 0)
{
LWLockRelease(lfc_lock);
PG_RETURN_NULL();
}
n_workers = lfc_ctl->n_prewarm_workers;
total_pages = lfc_ctl->total_prewarm_pages;
for (size_t i = 0; i < n_workers; i++)
{
PrewarmWorkerState* ws = &lfc_ctl->prewarm_workers[i];
prewarmed_pages += ws->prewarmed_pages;
skipped_pages += ws->skipped_pages;
active_workers += ws->completed != 0;
}
LWLockRelease(lfc_lock);
tupdesc = CreateTemplateTupleDesc(4);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_pages", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "prewarmed_pages", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "skipped_pages", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "active_workers", INT4OID, -1, 0);
tupdesc = BlessTupleDesc(tupdesc);
MemSet(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(total_pages);
values[1] = Int32GetDatum(prewarmed_pages);
values[2] = Int32GetDatum(skipped_pages);
values[3] = Int32GetDatum(active_workers);
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}

View File

@@ -33,6 +33,7 @@ extern void lfc_init(void);
extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
const void* buffer, XLogRecPtr lsn);
PGDLLEXPORT void lfc_prewarm_main(Datum main_arg);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,

View File

@@ -48,7 +48,6 @@
#define MIN_RECONNECT_INTERVAL_USEC 1000
#define MAX_RECONNECT_INTERVAL_USEC 1000000
enum NeonComputeMode {
CP_MODE_PRIMARY = 0,
CP_MODE_REPLICA,
@@ -167,6 +166,9 @@ typedef struct
WaitEventSet *wes_read;
} PageServer;
static uint32 local_request_counter;
#define GENERATE_REQUEST_ID() (((NeonRequestId)MyProcPid << 32) | ++local_request_counter)
static PageServer page_servers[MAX_SHARDS];
static bool pageserver_flush(shardno_t shard_no);
@@ -994,6 +996,7 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
pageserver_conn = NULL;
}
request->reqid = GENERATE_REQUEST_ID();
req_buff = nm_pack_request(request);
/*

View File

@@ -0,0 +1,22 @@
\echo Use "ALTER EXTENSION neon UPDATE TO '1.6'" to load this file. \quit
CREATE FUNCTION get_prewarm_info(out total_pages integer, out prewarmed_pages integer, out skipped_pages integer, out active_workers integer)
RETURNS record
AS 'MODULE_PATHNAME', 'get_prewarm_info'
LANGUAGE C STRICT
PARALLEL SAFE;
CREATE FUNCTION get_local_cache_state(max_chunks integer default null)
RETURNS bytea
AS 'MODULE_PATHNAME', 'get_local_cache_state'
LANGUAGE C
PARALLEL UNSAFE;
CREATE FUNCTION prewarm_local_cache(state bytea, n_workers integer default 1)
RETURNS void
AS 'MODULE_PATHNAME', 'prewarm_local_cache'
LANGUAGE C STRICT
PARALLEL UNSAFE;

View File

@@ -0,0 +1,7 @@
DROP FUNCTION IF EXISTS get_prewarm_info(out total_pages integer, out prewarmed_pages integer, out skipped_pages integer, out active_workers integer);
DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer default 1);

View File

@@ -56,7 +56,6 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL;
(errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
extern void pg_init_libpagestore(void);
extern void pg_init_walproposer(void);

View File

@@ -65,7 +65,6 @@ typedef enum {
SLRU_MULTIXACT_OFFSETS
} SlruKind;
/*--
* supertype of all the Neon*Request structs below.
*
@@ -187,6 +186,7 @@ typedef struct
{
/*
* Send this request to the PageServer associated with this shard.
* This function assigns request_id to the request which can be extracted by caller from request struct.
*/
bool (*send) (shardno_t shard_no, NeonRequest * request);
/*
@@ -281,4 +281,5 @@ extern void set_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumb
extern void update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size);
extern void forget_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum);
#endif /* PAGESTORE_CLIENT_H */

View File

@@ -11,6 +11,7 @@ bench = []
anyhow.workspace = true
async-stream.workspace = true
bytes.workspace = true
camino.workspace = true
clap = { workspace = true, features = ["derive"] }
const_format.workspace = true
futures.workspace = true
@@ -19,12 +20,14 @@ futures-util.workspace = true
humantime.workspace = true
hyper = { workspace = true, features = ["full"] }
http-body-util.workspace = true
http-utils.workspace = true
hyper-util = "0.1"
once_cell.workspace = true
parking_lot.workspace = true
prost.workspace = true
tonic.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-rustls.workspace = true
tracing.workspace = true
metrics.workspace = true
utils.workspace = true

View File

@@ -17,10 +17,13 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use camino::Utf8PathBuf;
use clap::{Parser, command};
use futures::future::OptionFuture;
use futures_core::Stream;
use futures_util::StreamExt;
use http_body_util::Full;
use http_utils::tls_certs::ReloadingCertificateResolver;
use hyper::body::Incoming;
use hyper::header::CONTENT_TYPE;
use hyper::service::service_fn;
@@ -38,7 +41,7 @@ use storage_broker::proto::{
FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage,
};
use storage_broker::{DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR, parse_proto_ttid};
use storage_broker::{DEFAULT_KEEPALIVE_INTERVAL, parse_proto_ttid};
use tokio::net::TcpListener;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
@@ -59,12 +62,25 @@ project_build_tag!(BUILD_TAG);
const DEFAULT_CHAN_SIZE: usize = 32;
const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384;
const DEFAULT_SSL_KEY_FILE: &str = "server.key";
const DEFAULT_SSL_CERT_FILE: &str = "server.crt";
const DEFAULT_SSL_CERT_RELOAD_PERIOD: &str = "60s";
#[derive(Parser, Debug)]
#[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)]
#[clap(group(
clap::ArgGroup::new("listen-addresses")
.required(true)
.multiple(true)
.args(&["listen_addr", "listen_https_addr"]),
))]
struct Args {
/// Endpoint to listen on.
#[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)]
listen_addr: SocketAddr,
/// Endpoint to listen HTTP on.
#[arg(short, long)]
listen_addr: Option<SocketAddr>,
/// Endpoint to listen HTTPS on.
#[arg(long)]
listen_https_addr: Option<SocketAddr>,
/// Size of the queue to the per timeline subscriber.
#[arg(long, default_value_t = DEFAULT_CHAN_SIZE)]
timeline_chan_size: usize,
@@ -72,11 +88,20 @@ struct Args {
#[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)]
all_keys_chan_size: usize,
/// HTTP/2 keepalive interval.
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
http2_keepalive_interval: Duration,
/// Format for logging, either 'plain' or 'json'.
#[arg(long, default_value = "plain")]
log_format: String,
/// Path to a file with certificate's private key for https API.
#[arg(long, default_value = DEFAULT_SSL_KEY_FILE)]
ssl_key_file: Utf8PathBuf,
/// Path to a file with a X509 certificate for https API.
#[arg(long, default_value = DEFAULT_SSL_CERT_FILE)]
ssl_cert_file: Utf8PathBuf,
/// Period to reload certificate and private key from files.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
ssl_cert_reload_period: Duration,
}
/// Id of publisher for registering in maps
@@ -674,12 +699,50 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
};
let storage_broker_server = BrokerServiceServer::new(storage_broker_impl);
let http_listener = match &args.listen_addr {
Some(addr) => {
info!("listening HTTP on {}", addr);
Some(TcpListener::bind(addr).await?)
}
None => None,
};
let (https_listener, tls_acceptor) = match &args.listen_https_addr {
Some(addr) => {
let listener = TcpListener::bind(addr).await?;
let cert_resolver = ReloadingCertificateResolver::new(
"main",
&args.ssl_key_file,
&args.ssl_cert_file,
args.ssl_cert_reload_period,
)
.await?;
let mut tls_config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(cert_resolver);
// Tonic is HTTP/2 only and it negotiates it with ALPN.
tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_config));
info!("listening HTTPS on {}", addr);
(Some(listener), Some(acceptor))
}
None => (None, None),
};
// grpc is served along with http1 for metrics on a single port, hence we
// don't use tonic's Server.
let tcp_listener = TcpListener::bind(&args.listen_addr).await?;
info!("listening on {}", &args.listen_addr);
loop {
let (stream, addr) = match tcp_listener.accept().await {
let (conn, is_https) = tokio::select! {
Some(conn) = OptionFuture::from(http_listener.as_ref().map(|l| l.accept())) => (conn, false),
Some(conn) = OptionFuture::from(https_listener.as_ref().map(|l| l.accept())) => (conn, true),
};
let (tcp_stream, addr) = match conn {
Ok(v) => v,
Err(e) => {
info!("couldn't accept connection: {e}");
@@ -734,13 +797,32 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
.await;
let tls_acceptor = tls_acceptor.clone();
tokio::task::spawn(async move {
let res = builder
.serve_connection(TokioIo::new(stream), service_fn_)
.await;
let res = if is_https {
let tls_acceptor =
tls_acceptor.expect("tls_acceptor is set together with https_listener");
let tls_stream = match tls_acceptor.accept(tcp_stream).await {
Ok(tls_stream) => tls_stream,
Err(e) => {
info!("error accepting TLS connection from {addr}: {e}");
return;
}
};
builder
.serve_connection(TokioIo::new(tls_stream), service_fn_)
.await
} else {
builder
.serve_connection(TokioIo::new(tcp_stream), service_fn_)
.await
};
if let Err(e) = res {
info!("error serving connection from {addr}: {e}");
info!(%is_https, "error serving connection from {addr}: {e}");
}
});
}

View File

@@ -196,7 +196,7 @@ struct Cli {
ssl_cert_reload_period: humantime::Duration,
/// Trusted root CA certificates to use in https APIs.
#[arg(long)]
ssl_ca_file: Option<PathBuf>,
ssl_ca_file: Option<Utf8PathBuf>,
/// Neon local specific flag. When set, ignore [`Cli::control_plane_url`] and deliver
/// the compute notification directly (instead of via control plane).

View File

@@ -134,6 +134,7 @@ pub(crate) enum DatabaseOperation {
UpdateTimelineImport,
DeleteTimelineImport,
ListTimelineImports,
IsTenantImportingTimeline,
}
#[must_use]
@@ -1641,9 +1642,7 @@ impl Persistence {
.await
}
pub(crate) async fn list_complete_timeline_imports(
&self,
) -> DatabaseResult<Vec<TimelineImport>> {
pub(crate) async fn list_timeline_imports(&self) -> DatabaseResult<Vec<TimelineImport>> {
use crate::schema::timeline_imports::dsl;
let persistent = self
.with_measured_conn(DatabaseOperation::ListTimelineImports, move |conn| {
@@ -1660,10 +1659,7 @@ impl Persistence {
.map(TimelineImport::from_persistent)
.collect();
match imports {
Ok(ok) => Ok(ok
.into_iter()
.filter(|import| import.is_complete())
.collect()),
Ok(ok) => Ok(ok.into_iter().collect()),
Err(err) => Err(DatabaseError::Logical(format!(
"failed to deserialize import: {err}"
))),
@@ -1773,6 +1769,25 @@ impl Persistence {
})
.await
}
pub(crate) async fn is_tenant_importing_timeline(
&self,
tenant_id: TenantId,
) -> DatabaseResult<bool> {
use crate::schema::timeline_imports::dsl;
self.with_measured_conn(DatabaseOperation::IsTenantImportingTimeline, move |conn| {
Box::pin(async move {
let imports: i64 = dsl::timeline_imports
.filter(dsl::tenant_id.eq(tenant_id.to_string()))
.count()
.get_result(conn)
.await?;
Ok(imports > 0)
})
})
.await
}
}
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {

View File

@@ -97,7 +97,9 @@ use crate::tenant_shard::{
ReconcileNeeded, ReconcileResult, ReconcileWaitError, ReconcilerStatus, ReconcilerWaiter,
ScheduleOptimization, ScheduleOptimizationAction, TenantShard,
};
use crate::timeline_import::{ShardImportStatuses, TimelineImport, UpcallClient};
use crate::timeline_import::{
ShardImportStatuses, TimelineImport, TimelineImportState, UpcallClient,
};
const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500);
@@ -878,15 +880,33 @@ impl Service {
});
}
// Fetch the list of completed imports and attempt to finalize them in the background.
// This handles the case where the previous storage controller instance shut down
// whilst finalizing imports.
let complete_imports = self.persistence.list_complete_timeline_imports().await;
match complete_imports {
Ok(ok) => {
// Reconcile the timeline imports:
// 1. Mark each tenant shard of tenants with an importing timeline as importing.
// 2. Finalize the completed imports in the background. This handles the case where
// the previous storage controller instance shut down whilst finalizing imports.
let imports = self.persistence.list_timeline_imports().await;
match imports {
Ok(mut imports) => {
{
let mut locked = self.inner.write().unwrap();
for import in &imports {
locked
.tenants
.range_mut(TenantShardId::tenant_range(import.tenant_id))
.for_each(|(_id, shard)| {
shard.importing = TimelineImportState::Importing
});
}
}
imports.retain(|import| import.is_complete());
tokio::task::spawn({
let finalize_imports_self = self.clone();
async move { finalize_imports_self.finalize_timeline_imports(ok).await }
async move {
finalize_imports_self
.finalize_timeline_imports(imports)
.await
}
});
}
Err(err) => {
@@ -3772,6 +3792,22 @@ impl Service {
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
let is_import = create_req.is_import();
if is_import {
// Ensure that there is no split on-going.
// [`Self::tenant_shard_split`] holds the exclusive tenant lock
// for the duration of the split, but here we handle the case
// where we restarted and the split is being aborted.
let locked = self.inner.read().unwrap();
let splitting = locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.any(|(_id, shard)| shard.splitting != SplitState::Idle);
if splitting {
return Err(ApiError::Conflict("Tenant is splitting shard".to_string()));
}
}
let timeline_info = self
.tenant_timeline_create_pageservers(tenant_id, create_req)
.await?;
@@ -3809,6 +3845,14 @@ impl Service {
.context("timeline import insert")
.map_err(ApiError::InternalServerError)?;
// Set the importing flag on the tenant shards
self.inner
.write()
.unwrap()
.tenants
.range_mut(TenantShardId::tenant_range(tenant_id))
.for_each(|(_id, shard)| shard.importing = TimelineImportState::Importing);
match inserted {
true => {
tracing::info!(%tenant_id, %timeline_id, "Inserted timeline import");
@@ -3931,6 +3975,13 @@ impl Service {
tracing::warn!("Failed to delete timeline import entry from database: {err}");
}
self.inner
.write()
.unwrap()
.tenants
.range_mut(TenantShardId::tenant_range(import.tenant_id))
.for_each(|(_id, shard)| shard.importing = TimelineImportState::Idle);
// TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn,
// so we can't create the timeline on the safekeepers. Fix by moving creation here.
// https://github.com/neondatabase/neon/issues/11569
@@ -4914,6 +4965,7 @@ impl Service {
is_reconciling: shard.reconciler.is_some(),
is_pending_compute_notification: shard.pending_compute_notification,
is_splitting: matches!(shard.splitting, SplitState::Splitting),
is_importing: shard.importing == TimelineImportState::Importing,
scheduling_policy: shard.get_scheduling_policy(),
preferred_az_id: shard.preferred_az().map(ToString::to_string),
})
@@ -5404,6 +5456,27 @@ impl Service {
.enter()
.map_err(|_| ApiError::ShuttingDown)?;
// Timeline imports on the pageserver side can't handle shard-splits.
// If the tenant is importing a timeline, dont't shard split it.
match self
.persistence
.is_tenant_importing_timeline(tenant_id)
.await
{
Ok(importing) => {
if importing {
return Err(ApiError::Conflict(
"Cannot shard split during timeline import".to_string(),
));
}
}
Err(err) => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Failed to check for running imports: {err}"
)));
}
}
let new_shard_count = ShardCount::new(split_req.new_shard_count);
let new_stripe_size = split_req.new_stripe_size;
@@ -8076,12 +8149,25 @@ impl Service {
candidates.extend(size_candidates);
}
// Filter out tenants in a prohibiting scheduling mode.
// Filter out tenants in a prohibiting scheduling modes
// and tenants with an ongoing import.
//
// Note that the import check here is oportunistic. An import might start
// after the check before we actually update [`TenantShard::splitting`].
// [`Self::tenant_shard_split`] checks the database whilst holding the exclusive
// tenant lock. Imports might take a long time, so the check here allows us
// to split something else instead of trying the same shard over and over.
{
let state = self.inner.read().unwrap();
candidates.retain(|i| {
let policy = state.tenants.get(&i.id).map(|s| s.get_scheduling_policy());
policy == Some(ShardSchedulingPolicy::Active)
let shard = state.tenants.get(&i.id);
match shard {
Some(t) => {
t.get_scheduling_policy() == ShardSchedulingPolicy::Active
&& t.importing == TimelineImportState::Idle
}
None => false,
}
});
}

View File

@@ -33,6 +33,7 @@ use crate::scheduler::{
RefCountUpdate, ScheduleContext, ScheduleError, Scheduler, SecondaryShardTag, ShardTag,
};
use crate::service::ReconcileResultRequest;
use crate::timeline_import::TimelineImportState;
use crate::{Sequence, service};
/// Serialization helper
@@ -100,6 +101,10 @@ pub(crate) struct TenantShard {
/// reconciliation, and timeline creation.
pub(crate) splitting: SplitState,
/// Flag indicating whether the tenant has an in-progress timeline import.
/// Used to disallow shard splits while an import is in progress.
pub(crate) importing: TimelineImportState,
/// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
/// is set. This flag is cleared when the tenant is popped off the delay queue.
pub(crate) delayed_reconcile: bool,
@@ -583,6 +588,7 @@ impl TenantShard {
config: TenantConfig::default(),
reconciler: None,
splitting: SplitState::Idle,
importing: TimelineImportState::Idle,
sequence: Sequence(1),
delayed_reconcile: false,
waiter: Arc::new(SeqWait::new(Sequence(0))),
@@ -1844,6 +1850,8 @@ impl TenantShard {
config: serde_json::from_str(&tsp.config).unwrap(),
reconciler: None,
splitting: tsp.splitting,
// Filled in during [`Service::startup_reconcile`]
importing: TimelineImportState::Idle,
waiter: Arc::new(SeqWait::new(Sequence::initial())),
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
last_error: Arc::default(),

View File

@@ -14,6 +14,12 @@ use utils::{
use crate::{persistence::TimelineImportPersistence, service::Config};
#[derive(Deserialize, Serialize, PartialEq, Eq)]
pub(crate) enum TimelineImportState {
Importing,
Idle,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub(crate) struct ShardImportStatuses(pub(crate) HashMap<ShardIndex, ShardImportStatus>);

View File

@@ -501,6 +501,9 @@ class NeonEnvBuilder:
# Flag to use https listener in storage controller, generate local ssl certs,
# and force pageservers and neon_local to use https for storage controller api.
self.use_https_storage_controller_api: bool = False
# Flag to use https listener in storage broker, generate local ssl certs,
# and force pageservers and safekeepers to use https for storage broker api.
self.use_https_storage_broker_api: bool = False
self.pageserver_virtual_file_io_engine: str | None = pageserver_virtual_file_io_engine
self.pageserver_get_vectored_concurrent_io: str | None = (
@@ -1086,7 +1089,7 @@ class NeonEnv:
self.safekeepers: list[Safekeeper] = []
self.pageservers: list[NeonPageserver] = []
self.num_azs = config.num_azs
self.broker = NeonBroker(self)
self.broker = NeonBroker(self, config.use_https_storage_broker_api)
self.pageserver_remote_storage = config.pageserver_remote_storage
self.safekeepers_remote_storage = config.safekeepers_remote_storage
self.pg_version = config.pg_version
@@ -1106,6 +1109,7 @@ class NeonEnv:
config.use_https_pageserver_api
or config.use_https_safekeeper_api
or config.use_https_storage_controller_api
or config.use_https_storage_broker_api
)
self.ssl_ca_file = (
self.repo_dir.joinpath("rootCA.crt") if self.generate_local_ssl_certs else None
@@ -1178,15 +1182,18 @@ class NeonEnv:
# Create the neon_local's `NeonLocalInitConf`
cfg: dict[str, Any] = {
"default_tenant_id": str(self.initial_tenant),
"broker": {
"listen_addr": self.broker.listen_addr(),
},
"broker": {},
"safekeepers": [],
"pageservers": [],
"endpoint_storage": {"port": self.port_distributor.get_port()},
"generate_local_ssl_certs": self.generate_local_ssl_certs,
}
if config.use_https_storage_broker_api:
cfg["broker"]["listen_https_addr"] = self.broker.listen_addr()
else:
cfg["broker"]["listen_addr"] = self.broker.listen_addr()
if self.control_plane_api is not None:
cfg["control_plane_api"] = self.control_plane_api
@@ -4933,9 +4940,10 @@ class Safekeeper(LogUtils):
class NeonBroker(LogUtils):
"""An object managing storage_broker instance"""
def __init__(self, env: NeonEnv):
super().__init__(logfile=env.repo_dir / "storage_broker.log")
def __init__(self, env: NeonEnv, use_https: bool):
super().__init__(logfile=env.repo_dir / "storage_broker" / "storage_broker.log")
self.env = env
self.scheme = "https" if use_https else "http"
self.port: int = self.env.port_distributor.get_port()
self.running = False
@@ -4958,7 +4966,7 @@ class NeonBroker(LogUtils):
return f"127.0.0.1:{self.port}"
def client_url(self):
return f"http://{self.listen_addr()}"
return f"{self.scheme}://{self.listen_addr()}"
def assert_no_errors(self):
assert_no_errors(self.logfile, "storage_controller", [])

View File

@@ -0,0 +1,147 @@
import random
import threading
import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import USE_LFC
def check_pinned_entries(cur):
# some LFC buffer can be temporary locked by autovacuum or background writer
for _ in range(10):
cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_pinned'")
n_pinned = cur.fetchall()[0][0]
if n_pinned == 0:
break
time.sleep(1)
assert n_pinned == 0
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_prewarm(neon_simple_env: NeonEnv):
env = neon_simple_env
n_records = 1000000
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.file_cache_prewarm_limit=1000",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.6'")
cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
cur.execute("select get_local_cache_state()")
lfc_state = cur.fetchall()[0][0]
endpoint.stop()
endpoint.start()
conn = endpoint.connect()
cur = conn.cursor()
time.sleep(1) # wait until compute_ctl complete downgrade of extension to default version
cur.execute("alter extension neon update to '1.6'")
cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
lfc_used_pages = cur.fetchall()[0][0]
log.info(f"Used LFC size: {lfc_used_pages}")
cur.execute("select * from get_prewarm_info()")
prewarm_info = cur.fetchall()[0]
log.info(f"Prewarm info: {prewarm_info}")
log.info(f"Prewarm progress: {(prewarm_info[1] + prewarm_info[2]) * 100 // prewarm_info[0]}%")
assert lfc_used_pages > 10000
assert (
prewarm_info[0] > 0
and prewarm_info[1] > 0
and prewarm_info[0] == prewarm_info[1] + prewarm_info[2]
)
cur.execute("select sum(pk) from t")
assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
check_pinned_entries(cur)
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv):
env = neon_simple_env
n_records = 10000
n_threads = 4
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
"shared_buffers=1MB",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.file_cache_prewarm_limit=1000000",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.6'")
cur.execute(
"create table accounts(id integer primary key, balance bigint default 0, payload text default repeat('?', 1000)) with (fillfactor=10)"
)
cur.execute(f"insert into accounts(id) values (generate_series(1,{n_records}))")
cur.execute("select get_local_cache_state()")
lfc_state = cur.fetchall()[0][0]
running = True
def workload():
conn = endpoint.connect()
cur = conn.cursor()
n_transfers = 0
while running:
src = random.randint(1, n_records)
dst = random.randint(1, n_records)
cur.execute("update accounts set balance=balance-100 where id=%s", (src,))
cur.execute("update accounts set balance=balance+100 where id=%s", (dst,))
n_transfers += 1
log.info(f"Number of transfers: {n_transfers}")
def prewarm():
conn = endpoint.connect()
cur = conn.cursor()
n_prewarms = 0
while running:
cur.execute("alter system set neon.file_cache_size_limit='1MB'")
cur.execute("select pg_reload_conf()")
cur.execute("alter system set neon.file_cache_size_limit='1GB'")
cur.execute("select pg_reload_conf()")
cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
n_prewarms += 1
log.info(f"Number of prewarms: {n_prewarms}")
workload_threads = []
for _ in range(n_threads):
t = threading.Thread(target=workload)
workload_threads.append(t)
t.start()
prewarm_thread = threading.Thread(target=prewarm)
prewarm_thread.start()
time.sleep(20)
running = False
for t in workload_threads:
t.join()
prewarm_thread.join()
cur.execute("select sum(balance) from accounts")
total_balance = cur.fetchall()[0][0]
assert total_balance == 0
check_pinned_entries(cur)

View File

@@ -6,6 +6,7 @@ import pytest
import requests
from fixtures.neon_fixtures import NeonEnvBuilder, StorageControllerApiException
from fixtures.utils import wait_until
from fixtures.workload import Workload
def test_pageserver_https_api(neon_env_builder: NeonEnvBuilder):
@@ -212,3 +213,24 @@ def test_server_and_cert_metrics(neon_env_builder: NeonEnvBuilder):
assert reload_error_cnt > 0
wait_until(reload_failed)
def test_storage_broker_https_api(neon_env_builder: NeonEnvBuilder):
"""
Test HTTPS storage broker API.
1. Make /status request to HTTPS API to ensure it's appropriately configured.
2. Generate simple workload to ensure that SK -> broker -> PS communication works well.
"""
neon_env_builder.use_https_storage_broker_api = True
env = neon_env_builder.init_start()
# 1. Simple check that HTTPS is enabled and works.
url = env.broker.client_url() + "/status"
assert url.startswith("https://")
requests.get(url, verify=str(env.ssl_ca_file)).raise_for_status()
# 2. Simple workload to check that SK -> broker -> PS communication works over HTTPS.
workload = Workload(env, env.initial_tenant, env.initial_timeline)
workload.init()
workload.write_rows(10)
workload.validate()