mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 22:49:58 +00:00
Compare commits
7 Commits
feature/df
...
chore/benc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9d3dc2d311 | ||
|
|
0c302ba127 | ||
|
|
7139ba08c8 | ||
|
|
f3e0a31e5d | ||
|
|
36c82121fb | ||
|
|
716bb82d37 | ||
|
|
2bb450b09a |
@@ -271,11 +271,11 @@ impl CompactionScheduler {
|
||||
current_version.options.ttl,
|
||||
&schema_metadata_manager,
|
||||
)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
warn!(e; "Failed to get ttl for region: {}", region_id);
|
||||
TimeToLive::default()
|
||||
});
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
warn!(e; "Failed to get ttl for region: {}", region_id);
|
||||
TimeToLive::default()
|
||||
});
|
||||
|
||||
debug!(
|
||||
"Pick compaction strategy {:?} for region: {}, ttl: {:?}",
|
||||
@@ -351,7 +351,7 @@ impl CompactionScheduler {
|
||||
job_id: None,
|
||||
reason: e.reason,
|
||||
}
|
||||
.fail();
|
||||
.fail();
|
||||
}
|
||||
|
||||
error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
|
||||
|
||||
@@ -27,9 +27,10 @@ mod handle_truncate;
|
||||
mod handle_write;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use common_base::Plugins;
|
||||
use common_meta::key::SchemaMetadataManagerRef;
|
||||
@@ -432,6 +433,12 @@ impl<S: LogStore> WorkerStarter<S> {
|
||||
let running = Arc::new(AtomicBool::new(true));
|
||||
let now = self.time_provider.current_time_millis();
|
||||
let id_string = self.id.to_string();
|
||||
|
||||
let slow_threshold = std::env::var("slow_threshold")
|
||||
.ok()
|
||||
.and_then(|v| u64::from_str(&v).ok())
|
||||
.unwrap_or(1000);
|
||||
|
||||
let mut worker_thread = RegionWorkerLoop {
|
||||
id: self.id,
|
||||
config: self.config.clone(),
|
||||
@@ -471,6 +478,9 @@ impl<S: LogStore> WorkerStarter<S> {
|
||||
region_count: REGION_COUNT.with_label_values(&[&id_string]),
|
||||
region_edit_queues: RegionEditQueues::default(),
|
||||
schema_metadata_manager: self.schema_metadata_manager,
|
||||
metrics: WorkerMetrics::default(),
|
||||
stall_start: None,
|
||||
slow_threshold: Duration::from_millis(slow_threshold),
|
||||
};
|
||||
let handle = common_runtime::spawn_global(async move {
|
||||
worker_thread.run().await;
|
||||
@@ -624,6 +634,46 @@ impl StalledRequests {
|
||||
}
|
||||
}
|
||||
|
||||
/// Local metrics of the worker.
|
||||
#[derive(Debug, Default)]
|
||||
struct WorkerMetrics {
|
||||
/// Elapsed time of the select block.
|
||||
select_cost: Duration,
|
||||
/// Number of times waking up by flush.
|
||||
num_flush_wake: usize,
|
||||
/// Number of times waking up by periodical tasks.
|
||||
num_periodical_wake: usize,
|
||||
/// Number of requests to handle.
|
||||
num_requests: usize,
|
||||
/// Number of stalled requests to process.
|
||||
num_stalled_request_processed: usize,
|
||||
/// Number of stalled requests to add.
|
||||
num_stalled_request_added: usize,
|
||||
/// Number of write stall.
|
||||
num_stall: usize,
|
||||
|
||||
/// Total time of handling stall requests.
|
||||
handle_stall_cost: Duration,
|
||||
/// Total time of handling requests.
|
||||
handle_request_cost: Duration,
|
||||
/// Total time of handling write requests.
|
||||
handle_write_request_cost: Duration,
|
||||
/// Total time of handling periodical tasks.
|
||||
handle_periodical_task_cost: Duration,
|
||||
/// Total time of handling requests after waking up.
|
||||
handle_cost: Duration,
|
||||
|
||||
// Cost of handle write
|
||||
/// Total time of flushing the worker.
|
||||
flush_worker_cost: Duration,
|
||||
/// Total time of writing WAL.
|
||||
write_wal_cost: Duration,
|
||||
/// Total time of writing memtables.
|
||||
write_memtable_cost: Duration,
|
||||
/// Total time of stall.
|
||||
stall_cost: Duration,
|
||||
}
|
||||
|
||||
/// Background worker loop to handle requests.
|
||||
struct RegionWorkerLoop<S> {
|
||||
/// Id of the worker.
|
||||
@@ -682,6 +732,11 @@ struct RegionWorkerLoop<S> {
|
||||
region_edit_queues: RegionEditQueues,
|
||||
/// Database level metadata manager.
|
||||
schema_metadata_manager: SchemaMetadataManagerRef,
|
||||
/// Metrics of the worker in one loop.
|
||||
metrics: WorkerMetrics,
|
||||
/// Last stall start time.
|
||||
stall_start: Option<Instant>,
|
||||
slow_threshold: Duration,
|
||||
}
|
||||
|
||||
impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
@@ -697,6 +752,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
// Buffer to retrieve requests from receiver.
|
||||
let mut buffer = RequestBuffer::with_capacity(self.config.worker_request_batch_size);
|
||||
|
||||
let mut select_start = Instant::now();
|
||||
while self.running.load(Ordering::Relaxed) {
|
||||
// Clear the buffer before handling next batch of requests.
|
||||
buffer.clear();
|
||||
@@ -714,6 +770,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
}
|
||||
}
|
||||
recv_res = self.flush_receiver.changed() => {
|
||||
self.metrics.num_flush_wake += 1;
|
||||
if recv_res.is_err() {
|
||||
// The channel is disconnected.
|
||||
break;
|
||||
@@ -729,12 +786,16 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
}
|
||||
}
|
||||
_ = &mut sleep => {
|
||||
self.metrics.num_periodical_wake += 1;
|
||||
// Timeout. Checks periodical tasks.
|
||||
self.handle_periodical_tasks();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
self.metrics.select_cost = select_start.elapsed();
|
||||
let handle_start = Instant::now();
|
||||
|
||||
if self.flush_receiver.has_changed().unwrap_or(false) {
|
||||
// Always checks whether we could process stalled requests to avoid a request
|
||||
// hangs too long.
|
||||
@@ -751,12 +812,28 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
self.metrics.num_requests = buffer.len();
|
||||
|
||||
self.listener.on_recv_requests(buffer.len());
|
||||
|
||||
let start = Instant::now();
|
||||
self.handle_requests(&mut buffer).await;
|
||||
self.metrics.handle_request_cost = start.elapsed();
|
||||
|
||||
let start = Instant::now();
|
||||
self.handle_periodical_tasks();
|
||||
self.metrics.handle_periodical_task_cost = start.elapsed();
|
||||
|
||||
self.metrics.handle_cost = handle_start.elapsed();
|
||||
if self.metrics.handle_cost + self.metrics.select_cost > self.slow_threshold {
|
||||
info!(
|
||||
"Region worker too slow, id: {}, metrics: {:?}",
|
||||
self.id, self.metrics
|
||||
);
|
||||
}
|
||||
// Clear the metrics.
|
||||
self.metrics = WorkerMetrics::default();
|
||||
select_start = Instant::now();
|
||||
}
|
||||
|
||||
self.clean().await;
|
||||
@@ -804,7 +881,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
|
||||
// Handles all write requests first. So we can alter regions without
|
||||
// considering existing write requests.
|
||||
let start = Instant::now();
|
||||
self.handle_write_requests(write_requests, true).await;
|
||||
self.metrics.handle_write_request_cost = start.elapsed();
|
||||
|
||||
self.handle_ddl_requests(ddl_requests).await;
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ use store_api::region_request::RegionCompactRequest;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::RegionNotFoundSnafu;
|
||||
use crate::metrics::COMPACTION_REQUEST_COUNT;
|
||||
use crate::metrics::{COMPACTION_REQUEST_COUNT};
|
||||
use crate::region::MitoRegionRef;
|
||||
use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx};
|
||||
use crate::worker::RegionWorkerLoop;
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
use std::collections::{hash_map, HashMap};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use api::v1::OpType;
|
||||
use common_telemetry::debug;
|
||||
@@ -43,7 +44,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
}
|
||||
|
||||
// Flush this worker if the engine needs to flush.
|
||||
let start = Instant::now();
|
||||
self.maybe_flush_worker();
|
||||
self.metrics.flush_worker_cost += start.elapsed();
|
||||
|
||||
if self.should_reject_write() {
|
||||
// The memory pressure is still too high, reject write requests.
|
||||
@@ -55,6 +58,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
|
||||
if self.write_buffer_manager.should_stall() && allow_stall {
|
||||
self.stalled_count.add(write_requests.len() as i64);
|
||||
self.metrics.num_stalled_request_added += write_requests.len();
|
||||
self.metrics.num_stall += 1;
|
||||
self.stall_start = Some(Instant::now());
|
||||
self.stalled_requests.append(&mut write_requests);
|
||||
self.listener.on_write_stall();
|
||||
return;
|
||||
@@ -70,6 +76,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
|
||||
// Write to WAL.
|
||||
{
|
||||
let start = Instant::now();
|
||||
let _timer = WRITE_STAGE_ELAPSED
|
||||
.with_label_values(&["write_wal"])
|
||||
.start_timer();
|
||||
@@ -87,12 +94,14 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
|
||||
region_ctx.set_next_entry_id(last_entry_id + 1);
|
||||
}
|
||||
self.metrics.write_wal_cost += start.elapsed();
|
||||
}
|
||||
Err(e) => {
|
||||
// Failed to write wal.
|
||||
for mut region_ctx in region_ctxs.into_values() {
|
||||
region_ctx.set_error(e.clone());
|
||||
}
|
||||
self.metrics.write_wal_cost += start.elapsed();
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -101,6 +110,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
let (mut put_rows, mut delete_rows) = (0, 0);
|
||||
// Write to memtables.
|
||||
{
|
||||
let start = Instant::now();
|
||||
let _timer = WRITE_STAGE_ELAPSED
|
||||
.with_label_values(&["write_memtable"])
|
||||
.start_timer();
|
||||
@@ -109,6 +119,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
put_rows += region_ctx.put_num;
|
||||
delete_rows += region_ctx.delete_num;
|
||||
}
|
||||
self.metrics.write_memtable_cost += start.elapsed();
|
||||
}
|
||||
WRITE_ROWS_TOTAL
|
||||
.with_label_values(&["put"])
|
||||
@@ -120,13 +131,19 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
|
||||
/// Handles all stalled write requests.
|
||||
pub(crate) async fn handle_stalled_requests(&mut self) {
|
||||
let handle_stall_start = Instant::now();
|
||||
if let Some(start) = self.stall_start.take() {
|
||||
self.metrics.stall_cost += start.elapsed();
|
||||
}
|
||||
// Handle stalled requests.
|
||||
let stalled = std::mem::take(&mut self.stalled_requests);
|
||||
self.stalled_count.sub(stalled.requests.len() as i64);
|
||||
// We already stalled these requests, don't stall them again.
|
||||
for (_, (_, requests)) in stalled.requests {
|
||||
self.metrics.num_stalled_request_processed += requests.len();
|
||||
self.handle_write_requests(requests, false).await;
|
||||
}
|
||||
self.metrics.handle_stall_cost += handle_stall_start.elapsed();
|
||||
}
|
||||
|
||||
/// Rejects all stalled requests.
|
||||
@@ -149,7 +166,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
/// Handles a specific region's stalled requests.
|
||||
pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
|
||||
debug!("Handles stalled requests for region {}", region_id);
|
||||
if let Some(start) = self.stall_start.take() {
|
||||
self.metrics.stall_cost += start.elapsed();
|
||||
}
|
||||
let requests = self.stalled_requests.remove(region_id);
|
||||
self.metrics.num_stalled_request_processed += requests.len();
|
||||
self.stalled_count.sub(requests.len() as i64);
|
||||
self.handle_write_requests(requests, true).await;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user