mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 20:32:56 +00:00
Compare commits
7 Commits
test/dev-b
...
chore/benc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9d3dc2d311 | ||
|
|
0c302ba127 | ||
|
|
7139ba08c8 | ||
|
|
f3e0a31e5d | ||
|
|
36c82121fb | ||
|
|
716bb82d37 | ||
|
|
2bb450b09a |
@@ -271,11 +271,11 @@ impl CompactionScheduler {
|
|||||||
current_version.options.ttl,
|
current_version.options.ttl,
|
||||||
&schema_metadata_manager,
|
&schema_metadata_manager,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap_or_else(|e| {
|
.unwrap_or_else(|e| {
|
||||||
warn!(e; "Failed to get ttl for region: {}", region_id);
|
warn!(e; "Failed to get ttl for region: {}", region_id);
|
||||||
TimeToLive::default()
|
TimeToLive::default()
|
||||||
});
|
});
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"Pick compaction strategy {:?} for region: {}, ttl: {:?}",
|
"Pick compaction strategy {:?} for region: {}, ttl: {:?}",
|
||||||
@@ -351,7 +351,7 @@ impl CompactionScheduler {
|
|||||||
job_id: None,
|
job_id: None,
|
||||||
reason: e.reason,
|
reason: e.reason,
|
||||||
}
|
}
|
||||||
.fail();
|
.fail();
|
||||||
}
|
}
|
||||||
|
|
||||||
error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
|
error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
|
||||||
|
|||||||
@@ -27,9 +27,10 @@ mod handle_truncate;
|
|||||||
mod handle_write;
|
mod handle_write;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
use std::str::FromStr;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use common_base::Plugins;
|
use common_base::Plugins;
|
||||||
use common_meta::key::SchemaMetadataManagerRef;
|
use common_meta::key::SchemaMetadataManagerRef;
|
||||||
@@ -432,6 +433,12 @@ impl<S: LogStore> WorkerStarter<S> {
|
|||||||
let running = Arc::new(AtomicBool::new(true));
|
let running = Arc::new(AtomicBool::new(true));
|
||||||
let now = self.time_provider.current_time_millis();
|
let now = self.time_provider.current_time_millis();
|
||||||
let id_string = self.id.to_string();
|
let id_string = self.id.to_string();
|
||||||
|
|
||||||
|
let slow_threshold = std::env::var("slow_threshold")
|
||||||
|
.ok()
|
||||||
|
.and_then(|v| u64::from_str(&v).ok())
|
||||||
|
.unwrap_or(1000);
|
||||||
|
|
||||||
let mut worker_thread = RegionWorkerLoop {
|
let mut worker_thread = RegionWorkerLoop {
|
||||||
id: self.id,
|
id: self.id,
|
||||||
config: self.config.clone(),
|
config: self.config.clone(),
|
||||||
@@ -471,6 +478,9 @@ impl<S: LogStore> WorkerStarter<S> {
|
|||||||
region_count: REGION_COUNT.with_label_values(&[&id_string]),
|
region_count: REGION_COUNT.with_label_values(&[&id_string]),
|
||||||
region_edit_queues: RegionEditQueues::default(),
|
region_edit_queues: RegionEditQueues::default(),
|
||||||
schema_metadata_manager: self.schema_metadata_manager,
|
schema_metadata_manager: self.schema_metadata_manager,
|
||||||
|
metrics: WorkerMetrics::default(),
|
||||||
|
stall_start: None,
|
||||||
|
slow_threshold: Duration::from_millis(slow_threshold),
|
||||||
};
|
};
|
||||||
let handle = common_runtime::spawn_global(async move {
|
let handle = common_runtime::spawn_global(async move {
|
||||||
worker_thread.run().await;
|
worker_thread.run().await;
|
||||||
@@ -624,6 +634,46 @@ impl StalledRequests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Local metrics of the worker.
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
struct WorkerMetrics {
|
||||||
|
/// Elapsed time of the select block.
|
||||||
|
select_cost: Duration,
|
||||||
|
/// Number of times waking up by flush.
|
||||||
|
num_flush_wake: usize,
|
||||||
|
/// Number of times waking up by periodical tasks.
|
||||||
|
num_periodical_wake: usize,
|
||||||
|
/// Number of requests to handle.
|
||||||
|
num_requests: usize,
|
||||||
|
/// Number of stalled requests to process.
|
||||||
|
num_stalled_request_processed: usize,
|
||||||
|
/// Number of stalled requests to add.
|
||||||
|
num_stalled_request_added: usize,
|
||||||
|
/// Number of write stall.
|
||||||
|
num_stall: usize,
|
||||||
|
|
||||||
|
/// Total time of handling stall requests.
|
||||||
|
handle_stall_cost: Duration,
|
||||||
|
/// Total time of handling requests.
|
||||||
|
handle_request_cost: Duration,
|
||||||
|
/// Total time of handling write requests.
|
||||||
|
handle_write_request_cost: Duration,
|
||||||
|
/// Total time of handling periodical tasks.
|
||||||
|
handle_periodical_task_cost: Duration,
|
||||||
|
/// Total time of handling requests after waking up.
|
||||||
|
handle_cost: Duration,
|
||||||
|
|
||||||
|
// Cost of handle write
|
||||||
|
/// Total time of flushing the worker.
|
||||||
|
flush_worker_cost: Duration,
|
||||||
|
/// Total time of writing WAL.
|
||||||
|
write_wal_cost: Duration,
|
||||||
|
/// Total time of writing memtables.
|
||||||
|
write_memtable_cost: Duration,
|
||||||
|
/// Total time of stall.
|
||||||
|
stall_cost: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
/// Background worker loop to handle requests.
|
/// Background worker loop to handle requests.
|
||||||
struct RegionWorkerLoop<S> {
|
struct RegionWorkerLoop<S> {
|
||||||
/// Id of the worker.
|
/// Id of the worker.
|
||||||
@@ -682,6 +732,11 @@ struct RegionWorkerLoop<S> {
|
|||||||
region_edit_queues: RegionEditQueues,
|
region_edit_queues: RegionEditQueues,
|
||||||
/// Database level metadata manager.
|
/// Database level metadata manager.
|
||||||
schema_metadata_manager: SchemaMetadataManagerRef,
|
schema_metadata_manager: SchemaMetadataManagerRef,
|
||||||
|
/// Metrics of the worker in one loop.
|
||||||
|
metrics: WorkerMetrics,
|
||||||
|
/// Last stall start time.
|
||||||
|
stall_start: Option<Instant>,
|
||||||
|
slow_threshold: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: LogStore> RegionWorkerLoop<S> {
|
impl<S: LogStore> RegionWorkerLoop<S> {
|
||||||
@@ -697,6 +752,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
// Buffer to retrieve requests from receiver.
|
// Buffer to retrieve requests from receiver.
|
||||||
let mut buffer = RequestBuffer::with_capacity(self.config.worker_request_batch_size);
|
let mut buffer = RequestBuffer::with_capacity(self.config.worker_request_batch_size);
|
||||||
|
|
||||||
|
let mut select_start = Instant::now();
|
||||||
while self.running.load(Ordering::Relaxed) {
|
while self.running.load(Ordering::Relaxed) {
|
||||||
// Clear the buffer before handling next batch of requests.
|
// Clear the buffer before handling next batch of requests.
|
||||||
buffer.clear();
|
buffer.clear();
|
||||||
@@ -714,6 +770,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
recv_res = self.flush_receiver.changed() => {
|
recv_res = self.flush_receiver.changed() => {
|
||||||
|
self.metrics.num_flush_wake += 1;
|
||||||
if recv_res.is_err() {
|
if recv_res.is_err() {
|
||||||
// The channel is disconnected.
|
// The channel is disconnected.
|
||||||
break;
|
break;
|
||||||
@@ -729,12 +786,16 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ = &mut sleep => {
|
_ = &mut sleep => {
|
||||||
|
self.metrics.num_periodical_wake += 1;
|
||||||
// Timeout. Checks periodical tasks.
|
// Timeout. Checks periodical tasks.
|
||||||
self.handle_periodical_tasks();
|
self.handle_periodical_tasks();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.metrics.select_cost = select_start.elapsed();
|
||||||
|
let handle_start = Instant::now();
|
||||||
|
|
||||||
if self.flush_receiver.has_changed().unwrap_or(false) {
|
if self.flush_receiver.has_changed().unwrap_or(false) {
|
||||||
// Always checks whether we could process stalled requests to avoid a request
|
// Always checks whether we could process stalled requests to avoid a request
|
||||||
// hangs too long.
|
// hangs too long.
|
||||||
@@ -751,12 +812,28 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
Err(_) => break,
|
Err(_) => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
self.metrics.num_requests = buffer.len();
|
||||||
|
|
||||||
self.listener.on_recv_requests(buffer.len());
|
self.listener.on_recv_requests(buffer.len());
|
||||||
|
|
||||||
|
let start = Instant::now();
|
||||||
self.handle_requests(&mut buffer).await;
|
self.handle_requests(&mut buffer).await;
|
||||||
|
self.metrics.handle_request_cost = start.elapsed();
|
||||||
|
|
||||||
|
let start = Instant::now();
|
||||||
self.handle_periodical_tasks();
|
self.handle_periodical_tasks();
|
||||||
|
self.metrics.handle_periodical_task_cost = start.elapsed();
|
||||||
|
|
||||||
|
self.metrics.handle_cost = handle_start.elapsed();
|
||||||
|
if self.metrics.handle_cost + self.metrics.select_cost > self.slow_threshold {
|
||||||
|
info!(
|
||||||
|
"Region worker too slow, id: {}, metrics: {:?}",
|
||||||
|
self.id, self.metrics
|
||||||
|
);
|
||||||
|
}
|
||||||
|
// Clear the metrics.
|
||||||
|
self.metrics = WorkerMetrics::default();
|
||||||
|
select_start = Instant::now();
|
||||||
}
|
}
|
||||||
|
|
||||||
self.clean().await;
|
self.clean().await;
|
||||||
@@ -804,7 +881,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
|
|
||||||
// Handles all write requests first. So we can alter regions without
|
// Handles all write requests first. So we can alter regions without
|
||||||
// considering existing write requests.
|
// considering existing write requests.
|
||||||
|
let start = Instant::now();
|
||||||
self.handle_write_requests(write_requests, true).await;
|
self.handle_write_requests(write_requests, true).await;
|
||||||
|
self.metrics.handle_write_request_cost = start.elapsed();
|
||||||
|
|
||||||
self.handle_ddl_requests(ddl_requests).await;
|
self.handle_ddl_requests(ddl_requests).await;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ use store_api::region_request::RegionCompactRequest;
|
|||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
use crate::error::RegionNotFoundSnafu;
|
use crate::error::RegionNotFoundSnafu;
|
||||||
use crate::metrics::COMPACTION_REQUEST_COUNT;
|
use crate::metrics::{COMPACTION_REQUEST_COUNT};
|
||||||
use crate::region::MitoRegionRef;
|
use crate::region::MitoRegionRef;
|
||||||
use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx};
|
use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx};
|
||||||
use crate::worker::RegionWorkerLoop;
|
use crate::worker::RegionWorkerLoop;
|
||||||
|
|||||||
@@ -16,6 +16,7 @@
|
|||||||
|
|
||||||
use std::collections::{hash_map, HashMap};
|
use std::collections::{hash_map, HashMap};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
use api::v1::OpType;
|
use api::v1::OpType;
|
||||||
use common_telemetry::debug;
|
use common_telemetry::debug;
|
||||||
@@ -43,7 +44,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Flush this worker if the engine needs to flush.
|
// Flush this worker if the engine needs to flush.
|
||||||
|
let start = Instant::now();
|
||||||
self.maybe_flush_worker();
|
self.maybe_flush_worker();
|
||||||
|
self.metrics.flush_worker_cost += start.elapsed();
|
||||||
|
|
||||||
if self.should_reject_write() {
|
if self.should_reject_write() {
|
||||||
// The memory pressure is still too high, reject write requests.
|
// The memory pressure is still too high, reject write requests.
|
||||||
@@ -55,6 +58,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
|
|
||||||
if self.write_buffer_manager.should_stall() && allow_stall {
|
if self.write_buffer_manager.should_stall() && allow_stall {
|
||||||
self.stalled_count.add(write_requests.len() as i64);
|
self.stalled_count.add(write_requests.len() as i64);
|
||||||
|
self.metrics.num_stalled_request_added += write_requests.len();
|
||||||
|
self.metrics.num_stall += 1;
|
||||||
|
self.stall_start = Some(Instant::now());
|
||||||
self.stalled_requests.append(&mut write_requests);
|
self.stalled_requests.append(&mut write_requests);
|
||||||
self.listener.on_write_stall();
|
self.listener.on_write_stall();
|
||||||
return;
|
return;
|
||||||
@@ -70,6 +76,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
|
|
||||||
// Write to WAL.
|
// Write to WAL.
|
||||||
{
|
{
|
||||||
|
let start = Instant::now();
|
||||||
let _timer = WRITE_STAGE_ELAPSED
|
let _timer = WRITE_STAGE_ELAPSED
|
||||||
.with_label_values(&["write_wal"])
|
.with_label_values(&["write_wal"])
|
||||||
.start_timer();
|
.start_timer();
|
||||||
@@ -87,12 +94,14 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
|
let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
|
||||||
region_ctx.set_next_entry_id(last_entry_id + 1);
|
region_ctx.set_next_entry_id(last_entry_id + 1);
|
||||||
}
|
}
|
||||||
|
self.metrics.write_wal_cost += start.elapsed();
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Failed to write wal.
|
// Failed to write wal.
|
||||||
for mut region_ctx in region_ctxs.into_values() {
|
for mut region_ctx in region_ctxs.into_values() {
|
||||||
region_ctx.set_error(e.clone());
|
region_ctx.set_error(e.clone());
|
||||||
}
|
}
|
||||||
|
self.metrics.write_wal_cost += start.elapsed();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -101,6 +110,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
let (mut put_rows, mut delete_rows) = (0, 0);
|
let (mut put_rows, mut delete_rows) = (0, 0);
|
||||||
// Write to memtables.
|
// Write to memtables.
|
||||||
{
|
{
|
||||||
|
let start = Instant::now();
|
||||||
let _timer = WRITE_STAGE_ELAPSED
|
let _timer = WRITE_STAGE_ELAPSED
|
||||||
.with_label_values(&["write_memtable"])
|
.with_label_values(&["write_memtable"])
|
||||||
.start_timer();
|
.start_timer();
|
||||||
@@ -109,6 +119,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
put_rows += region_ctx.put_num;
|
put_rows += region_ctx.put_num;
|
||||||
delete_rows += region_ctx.delete_num;
|
delete_rows += region_ctx.delete_num;
|
||||||
}
|
}
|
||||||
|
self.metrics.write_memtable_cost += start.elapsed();
|
||||||
}
|
}
|
||||||
WRITE_ROWS_TOTAL
|
WRITE_ROWS_TOTAL
|
||||||
.with_label_values(&["put"])
|
.with_label_values(&["put"])
|
||||||
@@ -120,13 +131,19 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
|
|
||||||
/// Handles all stalled write requests.
|
/// Handles all stalled write requests.
|
||||||
pub(crate) async fn handle_stalled_requests(&mut self) {
|
pub(crate) async fn handle_stalled_requests(&mut self) {
|
||||||
|
let handle_stall_start = Instant::now();
|
||||||
|
if let Some(start) = self.stall_start.take() {
|
||||||
|
self.metrics.stall_cost += start.elapsed();
|
||||||
|
}
|
||||||
// Handle stalled requests.
|
// Handle stalled requests.
|
||||||
let stalled = std::mem::take(&mut self.stalled_requests);
|
let stalled = std::mem::take(&mut self.stalled_requests);
|
||||||
self.stalled_count.sub(stalled.requests.len() as i64);
|
self.stalled_count.sub(stalled.requests.len() as i64);
|
||||||
// We already stalled these requests, don't stall them again.
|
// We already stalled these requests, don't stall them again.
|
||||||
for (_, (_, requests)) in stalled.requests {
|
for (_, (_, requests)) in stalled.requests {
|
||||||
|
self.metrics.num_stalled_request_processed += requests.len();
|
||||||
self.handle_write_requests(requests, false).await;
|
self.handle_write_requests(requests, false).await;
|
||||||
}
|
}
|
||||||
|
self.metrics.handle_stall_cost += handle_stall_start.elapsed();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Rejects all stalled requests.
|
/// Rejects all stalled requests.
|
||||||
@@ -149,7 +166,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
/// Handles a specific region's stalled requests.
|
/// Handles a specific region's stalled requests.
|
||||||
pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
|
pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
|
||||||
debug!("Handles stalled requests for region {}", region_id);
|
debug!("Handles stalled requests for region {}", region_id);
|
||||||
|
if let Some(start) = self.stall_start.take() {
|
||||||
|
self.metrics.stall_cost += start.elapsed();
|
||||||
|
}
|
||||||
let requests = self.stalled_requests.remove(region_id);
|
let requests = self.stalled_requests.remove(region_id);
|
||||||
|
self.metrics.num_stalled_request_processed += requests.len();
|
||||||
self.stalled_count.sub(requests.len() as i64);
|
self.stalled_count.sub(requests.len() as i64);
|
||||||
self.handle_write_requests(requests, true).await;
|
self.handle_write_requests(requests, true).await;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user