Add more information to timeline-related APIs (#1673)

Resolves #1488.

- implemented `GET tenant/:tenant_id/timeline/:timeline_id/wal_receiver` endpoint
- returned `thread_id` in `thread_mgr::spawn` 
- added `latest_gc_cutoff_lsn` field to `LocalTimelineInfo` struct
This commit is contained in:
Thang Pham
2022-05-16 11:05:43 -04:00
committed by GitHub
parent c41549f630
commit e4a70faa08
8 changed files with 204 additions and 17 deletions

View File

@@ -123,6 +123,53 @@ paths:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/timeline/{timeline_id}/wal_receiver:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
- name: timeline_id
in: path
required: true
schema:
type: string
format: hex
get:
description: Get wal receiver's data attached to the timeline
responses:
"200":
description: WalReceiverEntry
content:
application/json:
schema:
$ref: "#/components/schemas/WalReceiverEntry"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"404":
description: Error when no wal receiver is running or found
content:
application/json:
schema:
$ref: "#/components/schemas/NotFoundError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/timeline/{timeline_id}/attach:
parameters:
@@ -520,6 +567,21 @@ components:
type: integer
current_logical_size_non_incremental:
type: integer
WalReceiverEntry:
type: object
required:
- thread_id
- wal_producer_connstr
properties:
thread_id:
type: integer
wal_producer_connstr:
type: string
last_received_msg_lsn:
type: string
format: hex
last_received_msg_ts:
type: integer
Error:
type: object

View File

@@ -224,6 +224,30 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
json_response(StatusCode::OK, timeline_info)
}
async fn wal_receiver_get_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
let wal_receiver = tokio::task::spawn_blocking(move || {
let _enter =
info_span!("wal_receiver_get", tenant = %tenant_id, timeline = %timeline_id).entered();
crate::walreceiver::get_wal_receiver_entry(tenant_id, timeline_id)
})
.await
.map_err(ApiError::from_err)?
.ok_or_else(|| {
ApiError::NotFound(format!(
"WAL receiver not found for tenant {} and timeline {}",
tenant_id, timeline_id
))
})?;
json_response(StatusCode::OK, wal_receiver)
}
async fn timeline_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
@@ -485,6 +509,10 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_detail_handler,
)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/wal_receiver",
wal_receiver_get_handler,
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/attach",
timeline_attach_handler,

View File

@@ -281,6 +281,7 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> {
false,
move || crate::tenant_threads::gc_loop(tenant_id),
)
.map(|_thread_id| ()) // update the `Result::Ok` type to match the outer function's return signature
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
if let Err(e) = &gc_spawn_result {

View File

@@ -139,7 +139,7 @@ pub fn spawn<F>(
name: &str,
shutdown_process_on_error: bool,
f: F,
) -> std::io::Result<()>
) -> std::io::Result<u64>
where
F: FnOnce() -> anyhow::Result<()> + Send + 'static,
{
@@ -193,7 +193,7 @@ where
drop(jh_guard);
// The thread is now running. Nothing more to do here
Ok(())
Ok(thread_id)
}
/// This wrapper function runs in a newly-spawned thread. It initializes the

View File

@@ -45,6 +45,8 @@ pub struct LocalTimelineInfo {
#[serde_as(as = "Option<DisplayFromStr>")]
pub prev_record_lsn: Option<Lsn>,
#[serde_as(as = "DisplayFromStr")]
pub latest_gc_cutoff_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub disk_consistent_lsn: Lsn,
pub current_logical_size: Option<usize>, // is None when timeline is Unloaded
pub current_logical_size_non_incremental: Option<usize>,
@@ -68,6 +70,7 @@ impl LocalTimelineInfo {
disk_consistent_lsn: datadir_tline.tline.get_disk_consistent_lsn(),
last_record_lsn,
prev_record_lsn: Some(datadir_tline.tline.get_prev_record_lsn()),
latest_gc_cutoff_lsn: *datadir_tline.tline.get_latest_gc_cutoff_lsn(),
timeline_state: LocalTimelineState::Loaded,
current_logical_size: Some(datadir_tline.get_current_logical_size()),
current_logical_size_non_incremental: if include_non_incremental_logical_size {
@@ -91,6 +94,7 @@ impl LocalTimelineInfo {
disk_consistent_lsn: metadata.disk_consistent_lsn(),
last_record_lsn: metadata.disk_consistent_lsn(),
prev_record_lsn: metadata.prev_record_lsn(),
latest_gc_cutoff_lsn: metadata.latest_gc_cutoff_lsn(),
timeline_state: LocalTimelineState::Unloaded,
current_logical_size: None,
current_logical_size_non_incremental: None,

View File

@@ -18,6 +18,8 @@ use lazy_static::lazy_static;
use postgres_ffi::waldecoder::*;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::cell::Cell;
use std::collections::HashMap;
use std::str::FromStr;
@@ -35,11 +37,19 @@ use utils::{
zid::{ZTenantId, ZTenantTimelineId, ZTimelineId},
};
//
// We keep one WAL Receiver active per timeline.
//
struct WalReceiverEntry {
///
/// A WAL receiver's data stored inside the global `WAL_RECEIVERS`.
/// We keep one WAL receiver active per timeline.
///
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct WalReceiverEntry {
thread_id: u64,
wal_producer_connstr: String,
#[serde_as(as = "Option<DisplayFromStr>")]
last_received_msg_lsn: Option<Lsn>,
/// the timestamp (in microseconds) of the last received message
last_received_msg_ts: Option<u128>,
}
lazy_static! {
@@ -74,7 +84,7 @@ pub fn launch_wal_receiver(
receiver.wal_producer_connstr = wal_producer_connstr.into();
}
None => {
thread_mgr::spawn(
let thread_id = thread_mgr::spawn(
ThreadKind::WalReceiver,
Some(tenantid),
Some(timelineid),
@@ -88,7 +98,10 @@ pub fn launch_wal_receiver(
)?;
let receiver = WalReceiverEntry {
thread_id,
wal_producer_connstr: wal_producer_connstr.into(),
last_received_msg_lsn: None,
last_received_msg_ts: None,
};
receivers.insert((tenantid, timelineid), receiver);
@@ -99,15 +112,13 @@ pub fn launch_wal_receiver(
Ok(())
}
// Look up current WAL producer connection string in the hash table
fn get_wal_producer_connstr(tenantid: ZTenantId, timelineid: ZTimelineId) -> String {
/// Look up a WAL receiver's data in the global `WAL_RECEIVERS`
pub fn get_wal_receiver_entry(
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
) -> Option<WalReceiverEntry> {
let receivers = WAL_RECEIVERS.lock().unwrap();
receivers
.get(&(tenantid, timelineid))
.unwrap()
.wal_producer_connstr
.clone()
receivers.get(&(tenant_id, timeline_id)).cloned()
}
//
@@ -118,7 +129,18 @@ fn thread_main(conf: &'static PageServerConf, tenant_id: ZTenantId, timeline_id:
info!("WAL receiver thread started");
// Look up the current WAL producer address
let wal_producer_connstr = get_wal_producer_connstr(tenant_id, timeline_id);
let wal_producer_connstr = {
match get_wal_receiver_entry(tenant_id, timeline_id) {
Some(e) => e.wal_producer_connstr,
None => {
info!(
"Unable to create the WAL receiver thread: no WAL receiver entry found for tenant {} and timeline {}",
tenant_id, timeline_id
);
return;
}
}
};
// Make a connection to the WAL safekeeper, or directly to the primary PostgreSQL server,
// and start streaming WAL from it.
@@ -318,6 +340,28 @@ fn walreceiver_main(
let apply_lsn = u64::from(timeline_remote_consistent_lsn);
let ts = SystemTime::now();
// Update the current WAL receiver's data stored inside the global hash table `WAL_RECEIVERS`
{
let mut receivers = WAL_RECEIVERS.lock().unwrap();
let entry = match receivers.get_mut(&(tenant_id, timeline_id)) {
Some(e) => e,
None => {
anyhow::bail!(
"no WAL receiver entry found for tenant {} and timeline {}",
tenant_id,
timeline_id
);
}
};
entry.last_received_msg_lsn = Some(last_lsn);
entry.last_received_msg_ts = Some(
ts.duration_since(SystemTime::UNIX_EPOCH)
.expect("Received message time should be before UNIX EPOCH!")
.as_micros(),
);
}
// Send zenith feedback message.
// Regular standby_status_update fields are put into this message.
let zenith_status_update = ZenithFeedback {

View File

@@ -1,6 +1,12 @@
from uuid import uuid4, UUID
import pytest
from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient
from fixtures.zenith_fixtures import (
DEFAULT_BRANCH_NAME,
ZenithEnv,
ZenithEnvBuilder,
ZenithPageserverHttpClient,
ZenithPageserverApiException,
)
# test that we cannot override node id
@@ -48,6 +54,39 @@ def check_client(client: ZenithPageserverHttpClient, initial_tenant: UUID):
assert local_timeline_details['timeline_state'] == 'Loaded'
def test_pageserver_http_get_wal_receiver_not_found(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
client = env.pageserver.http_client()
tenant_id, timeline_id = env.zenith_cli.create_tenant()
# no PG compute node is running, so no WAL receiver is running
with pytest.raises(ZenithPageserverApiException) as e:
_ = client.wal_receiver_get(tenant_id, timeline_id)
assert "Not Found" in str(e.value)
def test_pageserver_http_get_wal_receiver_success(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
client = env.pageserver.http_client()
tenant_id, timeline_id = env.zenith_cli.create_tenant()
pg = env.postgres.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id)
res = client.wal_receiver_get(tenant_id, timeline_id)
assert list(res.keys()) == [
"thread_id",
"wal_producer_connstr",
"last_received_msg_lsn",
"last_received_msg_ts",
]
# make a DB modification then expect getting a new WAL receiver's data
pg.safe_psql("CREATE TABLE t(key int primary key, value text)")
res2 = client.wal_receiver_get(tenant_id, timeline_id)
assert res2["last_received_msg_lsn"] > res["last_received_msg_lsn"]
def test_pageserver_http_api_client(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
client = env.pageserver.http_client()

View File

@@ -786,6 +786,15 @@ class ZenithPageserverHttpClient(requests.Session):
assert isinstance(res_json, dict)
return res_json
def wal_receiver_get(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}/wal_receiver"
)
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def get_metrics(self) -> str:
res = self.get(f"http://localhost:{self.port}/metrics")
self.verbose_error(res)