Compare commits

...

2 Commits

Author SHA1 Message Date
John Spray
5d404698a8 inherit event key range 2024-09-12 18:56:41 +00:00
John Spray
afe3c32b66 pageserver: NeonMQ, a prototype message queue 2024-09-12 18:46:36 +00:00
6 changed files with 527 additions and 3 deletions

22
NEONMQ.md Normal file
View File

@@ -0,0 +1,22 @@
# NeonMQ
## High level design
In the prototype, the pageserver exposes HTTP endpoints for produce and consume.
In a real system, produce would go via the safekeeper. Topics would be 'timelines'
from the safekeeper's point of view.
A Timeline is used for storage. In the prototype this is just the same timeline that
stores some real database content too. In a real system, it might be a dedicated timeline.
## Server
## Client
Producers POST their messages.
Consumers poll, supply a topic and offset, and receive messages ahead of the offset.

25
demo.sh Executable file
View File

@@ -0,0 +1,25 @@
cargo neon stop
rm -rf .neon
cargo neon init
cargo neon start
export TENANT_ID=31ad8e1bfec849f61227b4c09f480b5f
export TIMELINE_ID=514099e6b4fad0c0612b3a6b8cf785fb
export CHILD_TIMELINE_ID=85321f977b4f9579c5fce1948f484652
cargo neon tenant create --tenant-id=$TENANT_ID --timeline-id=$TIMELINE_ID
# Produce some:
# ./neonmq.py produce mytopic ohai
# Show consumption:
# ./neonmq.py consume mytopic 1
# for word in shipping faster with postgres ; do ./neonmq.py produce mytopic $word ; done
# Make a child
# cargo neon timeline branch --tenant-id=31ad8e1bfec849f61227b4c09f480b5f --timeline-id=85321f977b4f9579c5fce1948f484652 --ancestor-branch-name=main --branch-name=child1
# Produce to child
# ./neonmq.py produce mytopic ohai 85321f977b4f9579c5fce1948f484652

View File

@@ -47,6 +47,9 @@ pub const AUX_KEY_PREFIX: u8 = 0x62;
/// The key prefix of ReplOrigin keys.
pub const REPL_ORIGIN_KEY_PREFIX: u8 = 0x63;
/// The key prefix of Event keys
pub const EVENT_KEY_PREFIX: u8 = 0x64;
/// Check if the key falls in the range of metadata keys.
pub const fn is_metadata_key_slice(key: &[u8]) -> bool {
key[0] >= METADATA_KEY_BEGIN_PREFIX && key[0] < METADATA_KEY_END_PREFIX
@@ -89,6 +92,24 @@ impl Key {
}
}
pub const fn metadata_noninherited_key_range() -> Range<Self> {
Key {
field1: METADATA_KEY_BEGIN_PREFIX,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
}..Key {
field1: EVENT_KEY_PREFIX,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
}
}
/// Get the range of aux keys.
pub fn metadata_aux_key_range() -> Range<Self> {
Key {
@@ -679,7 +700,7 @@ pub fn repl_origin_key_range() -> Range<Key> {
/// Non inherited range for vectored get.
pub const NON_INHERITED_RANGE: Range<Key> = AUX_FILES_KEY..AUX_FILES_KEY.next();
/// Sparse keyspace range for vectored get. Missing key error will be ignored for this range.
pub const NON_INHERITED_SPARSE_RANGE: Range<Key> = Key::metadata_key_range();
pub const NON_INHERITED_SPARSE_RANGE: Range<Key> = Key::metadata_noninherited_key_range();
impl Key {
// AUX_FILES currently stores only data for logical replication (slots etc), and

64
neonmq.py Executable file
View File

@@ -0,0 +1,64 @@
#!/usr/bin/env python3
import sys
import time
import requests
tenants_r = requests.get("http://localhost:9898/v1/tenant")
tenants_r.raise_for_status()
tenant = tenants_r.json()[0]
tenant_id = tenant["id"]
timelines_r = requests.get(f"http://localhost:9898/v1/tenant/{tenant_id}/timeline")
timelines_r.raise_for_status()
timeline = timelines_r.json()[0]
default_timeline_id = timeline["timeline_id"]
# print(f"Using MQ timeline {tenant_id}/{timeline_id}")
if sys.argv[1] == "consume":
topic_name = sys.argv[2]
offset = int(sys.argv[3])
try:
timeline_id = sys.argv[4]
except IndexError:
timeline_id = default_timeline_id
while True:
try:
response = requests.get(
f"http://localhost:9898/v1/tenant/{tenant_id}/timeline/{timeline_id}/event_consume/{topic_name}/{offset}"
)
if response.status_code == 200:
print(response.content)
stringized = "".join(map(chr, response.json()["payload"]))
print(stringized)
offset += 1
elif response.status_code == 408:
# Proceed to start another long-poll
pass
else:
print(response.content)
response.raise_for_status()
except Exception as e:
print(e)
time.sleep(5)
elif sys.argv[1] == "produce":
topic_name = sys.argv[2]
data = sys.argv[3]
try:
timeline_id = sys.argv[4]
except IndexError:
timeline_id = default_timeline_id
response = requests.post(
f"http://localhost:9898/v1/tenant/{tenant_id}/timeline/{timeline_id}/event_produce/{topic_name}",
data=data,
)
if response.status_code != 200:
print(response.content)
response.raise_for_status()

View File

@@ -4,11 +4,15 @@
use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::io::Read;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use anyhow::{anyhow, Context, Result};
use bytes::Buf;
use bytes::Bytes;
use enumset::EnumSet;
use futures::StreamExt;
use futures::TryFutureExt;
@@ -77,6 +81,8 @@ use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::storage_layer::LayerName;
use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::EventAck;
use crate::tenant::timeline::EventOffset;
use crate::tenant::timeline::Timeline;
use crate::tenant::GetTimelineError;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
@@ -2008,6 +2014,80 @@ async fn getpage_at_lsn_handler(
.await
}
async fn timeline_event_produce(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let topic_name: String = parse_request_param(&request, "topic_name")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let body = hyper::body::aggregate(request.body_mut())
.await
.context("Failed to read request body")
.map_err(ApiError::BadRequest)?;
if body.remaining() == 0 {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"missing request body"
)));
}
let mut buffer = Vec::new();
body.reader()
.read_to_end(&mut buffer)
.map_err(|e| ApiError::BadRequest(anyhow::anyhow!(e)))?;
let payload = Bytes::from(buffer);
let offset = timeline
.topics
.produce(topic_name, &timeline, payload, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, EventAck { offset })
}
async fn timeline_event_consume(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let topic_name: String = parse_request_param(&request, "topic_name")?;
let offset: EventOffset = parse_request_param(&request, "offset")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let deadline = Instant::now() + Duration::from_secs(5);
loop {
if Instant::now() > deadline {
return json_response(StatusCode::REQUEST_TIMEOUT, "no data yet");
}
let event = timeline
.topics
.consume(topic_name.clone(), offset, &timeline, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
if let Some(event) = event {
return json_response(StatusCode::OK, event);
};
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
async fn timeline_collect_keyspace(
request: Request<Body>,
_cancel: CancellationToken,
@@ -3084,5 +3164,17 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id/import_wal",
|r| api_handler(r, put_tenant_timeline_import_wal),
)
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/perf_info",
|r| testing_api_handler("perf_info", r, perf_info),
)
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/event_produce/:topic_name",
|r| api_handler(r, timeline_event_produce),
)
.get(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/event_consume/:topic_name/:offset",
|r| api_handler(r, timeline_event_consume),
)
.any(handler_404))
}

View File

@@ -22,7 +22,7 @@ use handle::ShardTimelineId;
use once_cell::sync::Lazy;
use pageserver_api::{
key::{
CompactKey, KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX,
CompactKey, EVENT_KEY_PREFIX, KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX,
NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE,
},
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
@@ -35,6 +35,7 @@ use pageserver_api::{
shard::{ShardIdentity, ShardNumber, TenantShardId},
};
use rand::Rng;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
use tokio::{
@@ -44,11 +45,11 @@ use tokio::{
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{
bin_ser::BeSer,
fs_ext, pausable_failpoint,
sync::gate::{Gate, GateGuard},
};
use std::pin::pin;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
@@ -62,6 +63,7 @@ use std::{
collections::btree_map::Entry,
ops::{Deref, Range},
};
use std::{pin::pin, sync::atomic::AtomicU8};
use crate::{
aux_file::AuxFileSizeEstimator,
@@ -162,6 +164,293 @@ pub(crate) enum FlushLoopState {
Exited,
}
#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct Event {
offset: EventOffset,
payload: EventPayload,
}
/// Response to producers
#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct EventAck {
pub(crate) offset: EventOffset,
}
struct Topic {
name: String,
id: TopicId,
next_offset: EventOffset,
}
/// Info about topic which is persisted
#[derive(Serialize, Deserialize, Debug)]
struct TopicHeader {
name: String,
next_offset: EventOffset,
}
type TopicId = u32;
type TopicName = String;
// Offsets should really be u64, using u32 for convenience to avoid packing
// u64 into Key fields.
pub(crate) type EventOffset = u32;
type EventPayload = Bytes;
struct TopicsInner {
next_id: TopicId,
topics: HashMap<TopicName, Topic>,
}
#[derive(Serialize, Deserialize, Debug)]
struct TopicSuperblock {
next_id: TopicId,
topics: Vec<TopicId>,
}
pub(crate) struct Topics {
loaded: AtomicU8,
inner: Mutex<TopicsInner>,
}
fn topic_header_key(topic_id: TopicId) -> Key {
Key {
field1: EVENT_KEY_PREFIX,
field2: 0,
field3: topic_id,
field4: 0,
field5: 0,
field6: 0,
}
}
impl Topics {
fn superblock_key(&self) -> Key {
Key {
field1: EVENT_KEY_PREFIX,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
}
}
async fn load(&self, timeline: &Timeline, ctx: &RequestContext) -> anyhow::Result<TopicsInner> {
let superblock_bytes = match timeline
.get(self.superblock_key(), timeline.get_last_record_lsn(), ctx)
.await
{
Ok(bytes) => bytes,
Err(PageReconstructError::MissingKey(_)) => {
return Ok(TopicsInner {
next_id: 1,
topics: Default::default(),
})
}
Err(e) => return Err(anyhow::anyhow!(e)),
};
let superblock: TopicSuperblock = serde_json::from_slice(&superblock_bytes).unwrap();
let mut inner = TopicsInner {
topics: Default::default(),
next_id: superblock.next_id,
};
for topic_id in superblock.topics {
let header_bytes = timeline
.get(
topic_header_key(topic_id),
timeline.get_last_record_lsn(),
ctx,
)
.await?;
let header: TopicHeader = serde_json::from_slice(&header_bytes).unwrap();
inner.topics.insert(
header.name.clone(),
Topic {
name: header.name,
id: topic_id,
next_offset: header.next_offset,
},
);
}
Ok(inner)
}
pub(crate) async fn produce(
&self,
topic_name: String,
timeline: &Timeline,
payload: EventPayload,
ctx: &RequestContext,
) -> anyhow::Result<EventOffset> {
if self.loaded.load(AtomicOrdering::Relaxed) == 0 {
let new_inner = self.load(timeline, ctx).await?;
*self.inner.lock().unwrap() = new_inner;
self.loaded.store(1, AtomicOrdering::Relaxed)
}
let (lsn, offset, write_keys) = {
let mut inner = self.inner.lock().unwrap();
// Taking LSN inside self.inner lock to serialize all updates
// Advance 8 bytes between imaginary LSNs, to preserve alignment
let lsn = Lsn(timeline.last_record_lsn.load().last.0 + 8);
let mut next_id = Some(inner.next_id);
use std::collections::hash_map;
let (offset, mut write_keys) = match inner.topics.entry(topic_name.clone()) {
hash_map::Entry::Vacant(e) => {
let topic = Topic {
next_offset: 1,
name: topic_name.clone(),
id: next_id.take().unwrap(),
};
e.insert(topic).produce(payload, lsn)?
}
hash_map::Entry::Occupied(mut e) => e.get_mut().produce(payload, lsn)?,
};
if next_id.is_none() {
inner.next_id += 1;
// This write transaction includes a superblock update. This must land atomically wrt
// the writes to our newly created topic.
let superblock = TopicSuperblock {
next_id: inner.next_id,
topics: inner.topics.values().map(|t| t.id).collect(),
};
let superblock_bytes = serde_json::to_vec(&superblock).unwrap();
let value = Value::Image(superblock_bytes.into());
write_keys.push((
self.superblock_key().to_compact(),
lsn,
value.serialized_size().unwrap() as usize,
value,
));
}
(lsn, offset, write_keys)
};
// FIXME: racing between picking lsn above and actually writing, all
// writes should go into a channel.
// Compose a write transaction: write the event to a page,
// and write the topic header to update knowledge of the
// next offset
let mut writer = timeline.writer().await;
writer.put_batch(write_keys, ctx).await?;
writer.finish_write(lsn);
Ok(offset)
}
pub(crate) async fn consume(
&self,
topic_name: String,
offset: EventOffset,
timeline: &Timeline,
ctx: &RequestContext,
) -> anyhow::Result<Option<Event>> {
let topic_id = {
let inner = self.inner.lock().unwrap();
let Some(topic) = inner.topics.get(&topic_name) else {
return Err(anyhow::anyhow!("Topic not found"));
};
if topic.next_offset <= offset {
return Ok(None);
}
topic.id
};
let event_key = Key {
field1: EVENT_KEY_PREFIX,
field2: 0,
field3: topic_id,
field4: 0,
field5: 0,
field6: offset,
};
let lsn = timeline.last_record_lsn.load().last;
match timeline.get(event_key, lsn, ctx).await {
Ok(event_bytes) => {
let event = serde_json::from_slice(&event_bytes).unwrap();
tracing::info!("Event found at {}/{}", topic_name, offset);
Ok(Some(event))
}
Err(PageReconstructError::MissingKey(_)) => {
// Unexpected: we checked against next_offset above
tracing::warn!("No event found at {}/{}", topic_name, offset);
Ok(None)
}
Err(e) => Err(e.into()),
}
}
}
type KeyBatch = Vec<(CompactKey, Lsn, usize, Value)>;
impl Topic {
pub(crate) fn produce(
&mut self,
payload: EventPayload,
lsn: Lsn,
) -> anyhow::Result<(EventOffset, KeyBatch)> {
let offset = self.next_offset;
self.next_offset += 1;
let event = Event { offset, payload };
let header_key = topic_header_key(self.id);
let event_key = Key {
field1: EVENT_KEY_PREFIX,
field2: 0,
field3: self.id,
field4: 0,
field5: 0,
field6: offset,
};
let header = TopicHeader {
name: self.name.clone(),
next_offset: self.next_offset,
};
let header_bytes = serde_json::to_vec(&header).unwrap().into();
let event_bytes = serde_json::to_vec(&event).unwrap().into();
let header_value = Value::Image(header_bytes);
let event_value = Value::Image(event_bytes);
let write_keys = vec![
(
header_key.to_compact(),
lsn,
header_value.serialized_size().unwrap() as usize,
header_value,
),
(
event_key.to_compact(),
lsn,
event_value.serialized_size().unwrap() as usize,
event_value,
),
];
Ok((offset, write_keys))
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ImageLayerCreationMode {
/// Try to create image layers based on `time_for_new_image_layer`. Used in compaction code path.
@@ -430,6 +719,8 @@ pub struct Timeline {
pub(crate) l0_flush_global_state: L0FlushGlobalState,
pub(crate) handles: handle::PerTimelineState<crate::page_service::TenantManagerTypes>,
pub(crate) topics: Topics,
}
pub struct WalReceiverInfo {
@@ -2240,6 +2531,15 @@ impl Timeline {
l0_flush_global_state: resources.l0_flush_global_state,
handles: Default::default(),
topics: Topics {
inner: Mutex::new(TopicsInner {
// TODO: recover from disk on startup
next_id: 123,
topics: Default::default(),
}),
loaded: Default::default(),
},
};
if aux_file_policy == Some(AuxFilePolicy::V1) {