Compare commits

...

10 Commits

Author SHA1 Message Date
Weny Xu
8bf772fb50 chore: disable stats persistence by default (#6900)
* chore: disable stats persistence by default

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix clippy

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 21:25:54 +08:00
discord9
9c1240921d feat: flow full aggr only trigger on new data (#6880)
* fix: flow full aggr only trigger on new data

Signed-off-by: discord9 <discord9@163.com>

* chore: better debug msg

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 18:15:17 +08:00
Zhenchi
eb52129a91 fix: move prune_region_dir to region drop (#6891)
* fix: move prune_region_dir to region drop

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 15:50:43 +08:00
Weny Xu
a0a2b40cbe fix: initialize remote WAL regions with correct flushed entry IDs (#6856)
* fix: initialize remote WAL regions with correct flushed entry IDs

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add logs

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: correct latest offset

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: update sqlness

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: add replay checkpoint to catchup request

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: logs

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 15:50:43 +08:00
Weny Xu
067c4458d6 chore: fix typo (#6887)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 15:50:43 +08:00
Ruihang Xia
4e9c31bf5c chore: fix typo (#6885)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 15:50:43 +08:00
Weny Xu
9320a6ddaa chore: update dashboard (#6883)
* chore: update dashboard

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update json

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update json

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add desc

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 15:50:43 +08:00
Zhenchi
4c9fcb7dee fix: prune intermediate dirs on index finish and region pruge (#6878)
* fix: prune intermediate dirs on index finish and region pruge

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 15:50:43 +08:00
Weny Xu
9dc16772fe fix: ignore reserved column IDs and prevent panic on chunk_size is zero (#6882)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 15:50:43 +08:00
discord9
6ee91f6af4 fix(flow): promql auto create table (#6867)
* fix: non aggr prom ql auto create table

Signed-off-by: discord9 <discord9@163.com>

* feat: val column use any name

Signed-off-by: discord9 <discord9@163.com>

* feat: check if it's tql src table

Signed-off-by: discord9 <discord9@163.com>

* test: check sink table is tql-able

Signed-off-by: discord9 <discord9@163.com>

* test: sqlness redacted

Signed-off-by: discord9 <discord9@163.com>

* fix: sql also handle no aggr case

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 15:50:43 +08:00
56 changed files with 9036 additions and 6800 deletions

View File

@@ -1,3 +1,8 @@
logging:
level: "info"
format: "json"
filters:
- log_store=debug
meta: meta:
configData: |- configData: |-
[runtime] [runtime]

View File

@@ -402,8 +402,8 @@
| `event_recorder` | -- | -- | Configuration options for the event recorder. | | `event_recorder` | -- | -- | Configuration options for the event recorder. |
| `event_recorder.ttl` | String | `90d` | TTL for the events table that will be used to store the events. Default is `90d`. | | `event_recorder.ttl` | String | `90d` | TTL for the events table that will be used to store the events. Default is `90d`. |
| `stats_persistence` | -- | -- | Configuration options for the stats persistence. | | `stats_persistence` | -- | -- | Configuration options for the stats persistence. |
| `stats_persistence.ttl` | String | `30d` | TTL for the stats table that will be used to store the stats. Default is `30d`.<br/>Set to `0s` to disable stats persistence. | | `stats_persistence.ttl` | String | `0s` | TTL for the stats table that will be used to store the stats.<br/>Set to `0s` to disable stats persistence.<br/>Default is `0s`.<br/>If you want to enable stats persistence, set the TTL to a value greater than 0.<br/>It is recommended to set a small value, e.g., `3h`. |
| `stats_persistence.interval` | String | `60s` | The interval to persist the stats. Default is `60s`.<br/>The minimum value is `60s`, if the value is less than `60s`, it will be overridden to `60s`. | | `stats_persistence.interval` | String | `10m` | The interval to persist the stats. Default is `10m`.<br/>The minimum value is `10m`, if the value is less than `10m`, it will be overridden to `10m`. |
| `logging` | -- | -- | The logging options. | | `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. | | `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. | | `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |

View File

@@ -274,12 +274,15 @@ ttl = "90d"
## Configuration options for the stats persistence. ## Configuration options for the stats persistence.
[stats_persistence] [stats_persistence]
## TTL for the stats table that will be used to store the stats. Default is `30d`. ## TTL for the stats table that will be used to store the stats.
## Set to `0s` to disable stats persistence. ## Set to `0s` to disable stats persistence.
ttl = "30d" ## Default is `0s`.
## The interval to persist the stats. Default is `60s`. ## If you want to enable stats persistence, set the TTL to a value greater than 0.
## The minimum value is `60s`, if the value is less than `60s`, it will be overridden to `60s`. ## It is recommended to set a small value, e.g., `3h`.
interval = "60s" ttl = "0s"
## The interval to persist the stats. Default is `10m`.
## The minimum value is `10m`, if the value is less than `10m`, it will be overridden to `10m`.
interval = "10m"
## The logging options. ## The logging options.
[logging] [logging]

File diff suppressed because it is too large Load Diff

View File

@@ -87,6 +87,13 @@
| Other Request P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, scheme, operation) (rate(opendal_operation_duration_seconds_bucket{instance=~"$datanode", operation!~"read\|write\|list\|Writer::write\|Writer::close\|Reader::read"}[$__rate_interval])))` | `timeseries` | Other Request P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` | | Other Request P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, scheme, operation) (rate(opendal_operation_duration_seconds_bucket{instance=~"$datanode", operation!~"read\|write\|list\|Writer::write\|Writer::close\|Reader::read"}[$__rate_interval])))` | `timeseries` | Other Request P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` |
| Opendal traffic | `sum by(instance, pod, scheme, operation) (rate(opendal_operation_bytes_sum{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Total traffic as in bytes by instance and operation | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` | | Opendal traffic | `sum by(instance, pod, scheme, operation) (rate(opendal_operation_bytes_sum{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Total traffic as in bytes by instance and operation | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` |
| OpenDAL errors per Instance | `sum by(instance, pod, scheme, operation, error) (rate(opendal_operation_errors_total{instance=~"$datanode", error!="NotFound"}[$__rate_interval]))` | `timeseries` | OpenDAL error counts per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]` | | OpenDAL errors per Instance | `sum by(instance, pod, scheme, operation, error) (rate(opendal_operation_errors_total{instance=~"$datanode", error!="NotFound"}[$__rate_interval]))` | `timeseries` | OpenDAL error counts per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]` |
# Remote WAL
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
| Triggered region flush total | `meta_triggered_region_flush_total` | `timeseries` | Triggered region flush total | `prometheus` | `none` | `{{pod}}-{{topic_name}}` |
| Triggered region checkpoint total | `meta_triggered_region_checkpoint_total` | `timeseries` | Triggered region checkpoint total | `prometheus` | `none` | `{{pod}}-{{topic_name}}` |
| Topic estimated replay size | `meta_topic_estimated_replay_size` | `timeseries` | Topic estimated max replay size | `prometheus` | `bytes` | `{{pod}}-{{topic_name}}` |
| Kafka logstore's bytes traffic | `rate(greptime_logstore_kafka_client_bytes_total[$__rate_interval])` | `timeseries` | Kafka logstore's bytes traffic | `prometheus` | `bytes` | `{{pod}}-{{logstore}}` |
# Metasrv # Metasrv
| Title | Query | Type | Description | Datasource | Unit | Legend Format | | Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- | | --- | --- | --- | --- | --- | --- | --- |
@@ -103,6 +110,8 @@
| Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` | | Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` |
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` | | Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` | | DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
| Reconciliation stats | `greptime_meta_reconciliation_stats` | `timeseries` | Reconciliation stats | `prometheus` | `s` | `{{pod}}-{{table_type}}-{{type}}` |
| Reconciliation steps | `histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)` | `timeseries` | Elapsed of Reconciliation steps | `prometheus` | `s` | `{{procedure_name}}-{{step}}-P90` |
# Flownode # Flownode
| Title | Query | Type | Description | Datasource | Unit | Legend Format | | Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- | | --- | --- | --- | --- | --- | --- | --- |

View File

@@ -802,6 +802,48 @@ groups:
type: prometheus type: prometheus
uid: ${metrics} uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]' legendFormat: '[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]'
- title: Remote WAL
panels:
- title: Triggered region flush total
type: timeseries
description: Triggered region flush total
unit: none
queries:
- expr: meta_triggered_region_flush_total
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{topic_name}}'
- title: Triggered region checkpoint total
type: timeseries
description: Triggered region checkpoint total
unit: none
queries:
- expr: meta_triggered_region_checkpoint_total
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{topic_name}}'
- title: Topic estimated replay size
type: timeseries
description: Topic estimated max replay size
unit: bytes
queries:
- expr: meta_topic_estimated_replay_size
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{topic_name}}'
- title: Kafka logstore's bytes traffic
type: timeseries
description: Kafka logstore's bytes traffic
unit: bytes
queries:
- expr: rate(greptime_logstore_kafka_client_bytes_total[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{logstore}}'
- title: Metasrv - title: Metasrv
panels: panels:
- title: Region migration datanode - title: Region migration datanode
@@ -948,6 +990,26 @@ groups:
type: prometheus type: prometheus
uid: ${metrics} uid: ${metrics}
legendFormat: AlterTable-{{step}} p90 legendFormat: AlterTable-{{step}} p90
- title: Reconciliation stats
type: timeseries
description: Reconciliation stats
unit: s
queries:
- expr: greptime_meta_reconciliation_stats
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{table_type}}-{{type}}'
- title: Reconciliation steps
type: timeseries
description: 'Elapsed of Reconciliation steps '
unit: s
queries:
- expr: histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{procedure_name}}-{{step}}-P90'
- title: Flownode - title: Flownode
panels: panels:
- title: Flow Ingest / Output Rate - title: Flow Ingest / Output Rate

File diff suppressed because it is too large Load Diff

View File

@@ -87,6 +87,13 @@
| Other Request P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, scheme, operation) (rate(opendal_operation_duration_seconds_bucket{ operation!~"read\|write\|list\|Writer::write\|Writer::close\|Reader::read"}[$__rate_interval])))` | `timeseries` | Other Request P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` | | Other Request P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, scheme, operation) (rate(opendal_operation_duration_seconds_bucket{ operation!~"read\|write\|list\|Writer::write\|Writer::close\|Reader::read"}[$__rate_interval])))` | `timeseries` | Other Request P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` |
| Opendal traffic | `sum by(instance, pod, scheme, operation) (rate(opendal_operation_bytes_sum{}[$__rate_interval]))` | `timeseries` | Total traffic as in bytes by instance and operation | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` | | Opendal traffic | `sum by(instance, pod, scheme, operation) (rate(opendal_operation_bytes_sum{}[$__rate_interval]))` | `timeseries` | Total traffic as in bytes by instance and operation | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` |
| OpenDAL errors per Instance | `sum by(instance, pod, scheme, operation, error) (rate(opendal_operation_errors_total{ error!="NotFound"}[$__rate_interval]))` | `timeseries` | OpenDAL error counts per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]` | | OpenDAL errors per Instance | `sum by(instance, pod, scheme, operation, error) (rate(opendal_operation_errors_total{ error!="NotFound"}[$__rate_interval]))` | `timeseries` | OpenDAL error counts per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]` |
# Remote WAL
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
| Triggered region flush total | `meta_triggered_region_flush_total` | `timeseries` | Triggered region flush total | `prometheus` | `none` | `{{pod}}-{{topic_name}}` |
| Triggered region checkpoint total | `meta_triggered_region_checkpoint_total` | `timeseries` | Triggered region checkpoint total | `prometheus` | `none` | `{{pod}}-{{topic_name}}` |
| Topic estimated replay size | `meta_topic_estimated_replay_size` | `timeseries` | Topic estimated max replay size | `prometheus` | `bytes` | `{{pod}}-{{topic_name}}` |
| Kafka logstore's bytes traffic | `rate(greptime_logstore_kafka_client_bytes_total[$__rate_interval])` | `timeseries` | Kafka logstore's bytes traffic | `prometheus` | `bytes` | `{{pod}}-{{logstore}}` |
# Metasrv # Metasrv
| Title | Query | Type | Description | Datasource | Unit | Legend Format | | Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- | | --- | --- | --- | --- | --- | --- | --- |
@@ -103,6 +110,8 @@
| Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` | | Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` |
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` | | Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` | | DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
| Reconciliation stats | `greptime_meta_reconciliation_stats` | `timeseries` | Reconciliation stats | `prometheus` | `s` | `{{pod}}-{{table_type}}-{{type}}` |
| Reconciliation steps | `histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)` | `timeseries` | Elapsed of Reconciliation steps | `prometheus` | `s` | `{{procedure_name}}-{{step}}-P90` |
# Flownode # Flownode
| Title | Query | Type | Description | Datasource | Unit | Legend Format | | Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- | | --- | --- | --- | --- | --- | --- | --- |

View File

@@ -802,6 +802,48 @@ groups:
type: prometheus type: prometheus
uid: ${metrics} uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]' legendFormat: '[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]'
- title: Remote WAL
panels:
- title: Triggered region flush total
type: timeseries
description: Triggered region flush total
unit: none
queries:
- expr: meta_triggered_region_flush_total
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{topic_name}}'
- title: Triggered region checkpoint total
type: timeseries
description: Triggered region checkpoint total
unit: none
queries:
- expr: meta_triggered_region_checkpoint_total
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{topic_name}}'
- title: Topic estimated replay size
type: timeseries
description: Topic estimated max replay size
unit: bytes
queries:
- expr: meta_topic_estimated_replay_size
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{topic_name}}'
- title: Kafka logstore's bytes traffic
type: timeseries
description: Kafka logstore's bytes traffic
unit: bytes
queries:
- expr: rate(greptime_logstore_kafka_client_bytes_total[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{logstore}}'
- title: Metasrv - title: Metasrv
panels: panels:
- title: Region migration datanode - title: Region migration datanode
@@ -948,6 +990,26 @@ groups:
type: prometheus type: prometheus
uid: ${metrics} uid: ${metrics}
legendFormat: AlterTable-{{step}} p90 legendFormat: AlterTable-{{step}} p90
- title: Reconciliation stats
type: timeseries
description: Reconciliation stats
unit: s
queries:
- expr: greptime_meta_reconciliation_stats
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{pod}}-{{table_type}}-{{type}}'
- title: Reconciliation steps
type: timeseries
description: 'Elapsed of Reconciliation steps '
unit: s
queries:
- expr: histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '{{procedure_name}}-{{step}}-P90'
- title: Flownode - title: Flownode
panels: panels:
- title: Flow Ingest / Output Rate - title: Flow Ingest / Output Rate

View File

@@ -251,7 +251,6 @@ macro_rules! define_from_tonic_status {
.get(key) .get(key)
.and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok()) .and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
} }
let code = metadata_value(&e, $crate::GREPTIME_DB_HEADER_ERROR_CODE) let code = metadata_value(&e, $crate::GREPTIME_DB_HEADER_ERROR_CODE)
.and_then(|s| { .and_then(|s| {
if let Ok(code) = s.parse::<u32>() { if let Ok(code) = s.parse::<u32>() {

View File

@@ -108,10 +108,6 @@ pub struct OpenRegion {
pub region_wal_options: HashMap<RegionNumber, String>, pub region_wal_options: HashMap<RegionNumber, String>,
#[serde(default)] #[serde(default)]
pub skip_wal_replay: bool, pub skip_wal_replay: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub replay_entry_id: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata_replay_entry_id: Option<u64>,
} }
impl OpenRegion { impl OpenRegion {
@@ -128,22 +124,8 @@ impl OpenRegion {
region_options, region_options,
region_wal_options, region_wal_options,
skip_wal_replay, skip_wal_replay,
replay_entry_id: None,
metadata_replay_entry_id: None,
} }
} }
/// Sets the replay entry id.
pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
self.replay_entry_id = replay_entry_id;
self
}
/// Sets the metadata replay entry id.
pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
self.metadata_replay_entry_id = metadata_replay_entry_id;
self
}
} }
/// The instruction of downgrading leader region. /// The instruction of downgrading leader region.
@@ -169,7 +151,7 @@ impl Display for DowngradeRegion {
} }
/// Upgrades a follower region to leader region. /// Upgrades a follower region to leader region.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct UpgradeRegion { pub struct UpgradeRegion {
/// The [RegionId]. /// The [RegionId].
pub region_id: RegionId, pub region_id: RegionId,
@@ -186,6 +168,24 @@ pub struct UpgradeRegion {
/// The hint for replaying memtable. /// The hint for replaying memtable.
#[serde(default)] #[serde(default)]
pub location_id: Option<u64>, pub location_id: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub replay_entry_id: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata_replay_entry_id: Option<u64>,
}
impl UpgradeRegion {
/// Sets the replay entry id.
pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
self.replay_entry_id = replay_entry_id;
self
}
/// Sets the metadata replay entry id.
pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
self.metadata_replay_entry_id = metadata_replay_entry_id;
self
}
} }
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -370,8 +370,6 @@ mod tests {
region_options, region_options,
region_wal_options: HashMap::new(), region_wal_options: HashMap::new(),
skip_wal_replay: false, skip_wal_replay: false,
replay_entry_id: None,
metadata_replay_entry_id: None,
}; };
assert_eq!(expected, deserialized); assert_eq!(expected, deserialized);
} }

View File

@@ -46,7 +46,7 @@ pub struct TopicRegionValue {
pub checkpoint: Option<ReplayCheckpoint>, pub checkpoint: Option<ReplayCheckpoint>,
} }
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)] #[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
pub struct ReplayCheckpoint { pub struct ReplayCheckpoint {
#[serde(default)] #[serde(default)]
pub entry_id: u64, pub entry_id: u64,

View File

@@ -24,6 +24,7 @@ use datatypes::schema::ColumnSchema;
use futures::future::{join_all, try_join_all}; use futures::future::{join_all, try_join_all};
use snafu::{ensure, OptionExt, ResultExt}; use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{RegionId, TableId}; use store_api::storage::{RegionId, TableId};
use table::metadata::{RawTableInfo, RawTableMeta}; use table::metadata::{RawTableInfo, RawTableMeta};
use table::table_name::TableName; use table::table_name::TableName;
@@ -384,6 +385,7 @@ pub(crate) fn build_table_meta_from_column_metadatas(
*next_column_id = column_ids *next_column_id = column_ids
.iter() .iter()
.filter(|id| !ReservedColumnId::is_reserved(**id))
.max() .max()
.map(|max| max + 1) .map(|max| max + 1)
.unwrap_or(*next_column_id) .unwrap_or(*next_column_id)
@@ -1039,9 +1041,13 @@ mod tests {
fn test_build_table_info_from_column_metadatas() { fn test_build_table_info_from_column_metadatas() {
let mut column_metadatas = new_test_column_metadatas(); let mut column_metadatas = new_test_column_metadatas();
column_metadatas.push(ColumnMetadata { column_metadatas.push(ColumnMetadata {
column_schema: ColumnSchema::new("col3", ConcreteDataType::string_datatype(), true), column_schema: ColumnSchema::new(
"__table_id",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag, semantic_type: SemanticType::Tag,
column_id: 3, column_id: ReservedColumnId::table_id(),
}); });
let table_id = 1; let table_id = 1;
@@ -1066,8 +1072,11 @@ mod tests {
assert_eq!(new_table_meta.partition_key_indices, vec![2]); assert_eq!(new_table_meta.partition_key_indices, vec![2]);
assert_eq!(new_table_meta.value_indices, vec![1, 2]); assert_eq!(new_table_meta.value_indices, vec![1, 2]);
assert_eq!(new_table_meta.schema.timestamp_index, Some(1)); assert_eq!(new_table_meta.schema.timestamp_index, Some(1));
assert_eq!(new_table_meta.column_ids, vec![0, 1, 2, 3]); assert_eq!(
assert_eq!(new_table_meta.next_column_id, 4); new_table_meta.column_ids,
vec![0, 1, 2, ReservedColumnId::table_id()]
);
assert_eq!(new_table_meta.next_column_id, table_meta.next_column_id);
} }
#[test] #[test]

View File

@@ -238,10 +238,7 @@ mod tests {
// Upgrade region // Upgrade region
let instruction = Instruction::UpgradeRegion(UpgradeRegion { let instruction = Instruction::UpgradeRegion(UpgradeRegion {
region_id, region_id,
last_entry_id: None, ..Default::default()
metadata_last_entry_id: None,
replay_timeout: None,
location_id: None,
}); });
assert!( assert!(
heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction))) heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))

View File

@@ -16,7 +16,7 @@ use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
use common_meta::wal_options_allocator::prepare_wal_options; use common_meta::wal_options_allocator::prepare_wal_options;
use futures_util::future::BoxFuture; use futures_util::future::BoxFuture;
use store_api::path_utils::table_dir; use store_api::path_utils::table_dir;
use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest, ReplayCheckpoint}; use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest};
use crate::heartbeat::handler::HandlerContext; use crate::heartbeat::handler::HandlerContext;
@@ -29,31 +29,18 @@ impl HandlerContext {
mut region_options, mut region_options,
region_wal_options, region_wal_options,
skip_wal_replay, skip_wal_replay,
replay_entry_id,
metadata_replay_entry_id,
}: OpenRegion, }: OpenRegion,
) -> BoxFuture<'static, Option<InstructionReply>> { ) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move { Box::pin(async move {
let region_id = Self::region_ident_to_region_id(&region_ident); let region_id = Self::region_ident_to_region_id(&region_ident);
prepare_wal_options(&mut region_options, region_id, &region_wal_options); prepare_wal_options(&mut region_options, region_id, &region_wal_options);
let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
(Some(replay_entry_id), Some(metadata_replay_entry_id)) => Some(ReplayCheckpoint {
entry_id: replay_entry_id,
metadata_entry_id: Some(metadata_replay_entry_id),
}),
(Some(replay_entry_id), None) => Some(ReplayCheckpoint {
entry_id: replay_entry_id,
metadata_entry_id: None,
}),
_ => None,
};
let request = RegionRequest::Open(RegionOpenRequest { let request = RegionRequest::Open(RegionOpenRequest {
engine: region_ident.engine, engine: region_ident.engine,
table_dir: table_dir(&region_storage_path, region_id.table_id()), table_dir: table_dir(&region_storage_path, region_id.table_id()),
path_type: PathType::Bare, path_type: PathType::Bare,
options: region_options, options: region_options,
skip_wal_replay, skip_wal_replay,
checkpoint, checkpoint: None,
}); });
let result = self.region_server.handle_request(region_id, request).await; let result = self.region_server.handle_request(region_id, request).await;
let success = result.is_ok(); let success = result.is_ok();

View File

@@ -15,7 +15,7 @@
use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply}; use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply};
use common_telemetry::{info, warn}; use common_telemetry::{info, warn};
use futures_util::future::BoxFuture; use futures_util::future::BoxFuture;
use store_api::region_request::{RegionCatchupRequest, RegionRequest}; use store_api::region_request::{RegionCatchupRequest, RegionRequest, ReplayCheckpoint};
use crate::heartbeat::handler::HandlerContext; use crate::heartbeat::handler::HandlerContext;
use crate::heartbeat::task_tracker::WaitResult; use crate::heartbeat::task_tracker::WaitResult;
@@ -29,6 +29,8 @@ impl HandlerContext {
metadata_last_entry_id, metadata_last_entry_id,
replay_timeout, replay_timeout,
location_id, location_id,
replay_entry_id,
metadata_replay_entry_id,
}: UpgradeRegion, }: UpgradeRegion,
) -> BoxFuture<'static, Option<InstructionReply>> { ) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move { Box::pin(async move {
@@ -50,6 +52,14 @@ impl HandlerContext {
let region_server_moved = self.region_server.clone(); let region_server_moved = self.region_server.clone();
let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
(Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint {
entry_id,
metadata_entry_id,
}),
_ => None,
};
// The catchup task is almost zero cost if the inside region is writable. // The catchup task is almost zero cost if the inside region is writable.
// Therefore, it always registers a new catchup task. // Therefore, it always registers a new catchup task.
let register_result = self let register_result = self
@@ -66,6 +76,7 @@ impl HandlerContext {
entry_id: last_entry_id, entry_id: last_entry_id,
metadata_entry_id: metadata_last_entry_id, metadata_entry_id: metadata_last_entry_id,
location_id, location_id,
checkpoint,
}), }),
) )
.await?; .await?;
@@ -148,10 +159,8 @@ mod tests {
.clone() .clone()
.handle_upgrade_region_instruction(UpgradeRegion { .handle_upgrade_region_instruction(UpgradeRegion {
region_id, region_id,
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout, replay_timeout,
location_id: None, ..Default::default()
}) })
.await; .await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
@@ -187,10 +196,8 @@ mod tests {
.clone() .clone()
.handle_upgrade_region_instruction(UpgradeRegion { .handle_upgrade_region_instruction(UpgradeRegion {
region_id, region_id,
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout, replay_timeout,
location_id: None, ..Default::default()
}) })
.await; .await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
@@ -227,10 +234,8 @@ mod tests {
.clone() .clone()
.handle_upgrade_region_instruction(UpgradeRegion { .handle_upgrade_region_instruction(UpgradeRegion {
region_id, region_id,
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout, replay_timeout,
location_id: None, ..Default::default()
}) })
.await; .await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
@@ -271,9 +276,7 @@ mod tests {
.handle_upgrade_region_instruction(UpgradeRegion { .handle_upgrade_region_instruction(UpgradeRegion {
region_id, region_id,
replay_timeout, replay_timeout,
last_entry_id: None, ..Default::default()
metadata_last_entry_id: None,
location_id: None,
}) })
.await; .await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
@@ -289,10 +292,8 @@ mod tests {
let reply = handler_context let reply = handler_context
.handle_upgrade_region_instruction(UpgradeRegion { .handle_upgrade_region_instruction(UpgradeRegion {
region_id, region_id,
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout: Some(Duration::from_millis(500)), replay_timeout: Some(Duration::from_millis(500)),
location_id: None, ..Default::default()
}) })
.await; .await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
@@ -332,10 +333,7 @@ mod tests {
.clone() .clone()
.handle_upgrade_region_instruction(UpgradeRegion { .handle_upgrade_region_instruction(UpgradeRegion {
region_id, region_id,
last_entry_id: None, ..Default::default()
metadata_last_entry_id: None,
replay_timeout: None,
location_id: None,
}) })
.await; .await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
@@ -351,10 +349,8 @@ mod tests {
.clone() .clone()
.handle_upgrade_region_instruction(UpgradeRegion { .handle_upgrade_region_instruction(UpgradeRegion {
region_id, region_id,
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout: Some(Duration::from_millis(200)), replay_timeout: Some(Duration::from_millis(200)),
location_id: None, ..Default::default()
}) })
.await; .await;
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));

View File

@@ -29,10 +29,15 @@ use common_runtime::JoinHandle;
use common_telemetry::tracing::warn; use common_telemetry::tracing::warn;
use common_telemetry::{debug, info}; use common_telemetry::{debug, info};
use common_time::TimeToLive; use common_time::TimeToLive;
use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
use query::QueryEngineRef; use query::QueryEngineRef;
use session::context::QueryContext;
use snafu::{ensure, OptionExt, ResultExt}; use snafu::{ensure, OptionExt, ResultExt};
use sql::parsers::utils::is_tql; use sql::parsers::utils::is_tql;
use store_api::storage::{RegionId, TableId}; use store_api::storage::{RegionId, TableId};
use table::table_reference::TableReference;
use tokio::sync::{oneshot, RwLock}; use tokio::sync::{oneshot, RwLock};
use crate::batching_mode::frontend_client::FrontendClient; use crate::batching_mode::frontend_client::FrontendClient;
@@ -42,8 +47,8 @@ use crate::batching_mode::utils::sql_to_df_plan;
use crate::batching_mode::BatchingModeOptions; use crate::batching_mode::BatchingModeOptions;
use crate::engine::FlowEngine; use crate::engine::FlowEngine;
use crate::error::{ use crate::error::{
CreateFlowSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, InvalidQuerySnafu, CreateFlowSnafu, DatafusionSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu,
TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu, InvalidQuerySnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
}; };
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW; use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
use crate::{CreateFlowArgs, Error, FlowId, TableName}; use crate::{CreateFlowArgs, Error, FlowId, TableName};
@@ -151,9 +156,11 @@ impl BatchingEngine {
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move { let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
let src_table_names = &task.config.source_table_names; let src_table_names = &task.config.source_table_names;
let mut all_dirty_windows = HashSet::new(); let mut all_dirty_windows = HashSet::new();
let mut is_dirty = false;
for src_table_name in src_table_names { for src_table_name in src_table_names {
if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) { if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
let Some(expr) = &task.config.time_window_expr else { let Some(expr) = &task.config.time_window_expr else {
is_dirty = true;
continue; continue;
}; };
for timestamp in timestamps { for timestamp in timestamps {
@@ -168,6 +175,9 @@ impl BatchingEngine {
} }
} }
let mut state = task.state.write().unwrap(); let mut state = task.state.write().unwrap();
if is_dirty {
state.dirty_time_windows.set_dirty();
}
let flow_id_label = task.config.flow_id.to_string(); let flow_id_label = task.config.flow_id.to_string();
for timestamp in all_dirty_windows { for timestamp in all_dirty_windows {
state.dirty_time_windows.add_window(timestamp, None); state.dirty_time_windows.add_window(timestamp, None);
@@ -269,9 +279,12 @@ impl BatchingEngine {
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move { let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
let src_table_names = &task.config.source_table_names; let src_table_names = &task.config.source_table_names;
let mut is_dirty = false;
for src_table_name in src_table_names { for src_table_name in src_table_names {
if let Some(entry) = group_by_table_name.get(src_table_name) { if let Some(entry) = group_by_table_name.get(src_table_name) {
let Some(expr) = &task.config.time_window_expr else { let Some(expr) = &task.config.time_window_expr else {
is_dirty = true;
continue; continue;
}; };
let involved_time_windows = expr.handle_rows(entry.clone()).await?; let involved_time_windows = expr.handle_rows(entry.clone()).await?;
@@ -281,6 +294,10 @@ impl BatchingEngine {
.add_lower_bounds(involved_time_windows.into_iter()); .add_lower_bounds(involved_time_windows.into_iter());
} }
} }
if is_dirty {
task.state.write().unwrap().dirty_time_windows.set_dirty();
}
Ok(()) Ok(())
}); });
handles.push(handle); handles.push(handle);
@@ -370,13 +387,12 @@ impl BatchingEngine {
} }
})?; })?;
let query_ctx = Arc::new(query_ctx); let query_ctx = Arc::new(query_ctx);
let is_tql = is_tql(query_ctx.sql_dialect(), &sql)
.map_err(BoxedError::new)
.context(CreateFlowSnafu { sql: &sql })?;
// optionally set a eval interval for the flow // optionally set a eval interval for the flow
if eval_interval.is_none() if eval_interval.is_none() && is_tql {
&& is_tql(query_ctx.sql_dialect(), &sql)
.map_err(BoxedError::new)
.context(CreateFlowSnafu { sql: &sql })?
{
InvalidQuerySnafu { InvalidQuerySnafu {
reason: "TQL query requires EVAL INTERVAL to be set".to_string(), reason: "TQL query requires EVAL INTERVAL to be set".to_string(),
} }
@@ -418,6 +434,11 @@ impl BatchingEngine {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let plan = sql_to_df_plan(query_ctx.clone(), self.query_engine.clone(), &sql, true).await?; let plan = sql_to_df_plan(query_ctx.clone(), self.query_engine.clone(), &sql, true).await?;
if is_tql {
self.check_is_tql_table(&plan, &query_ctx).await?;
}
let (column_name, time_window_expr, _, df_schema) = find_time_window_expr( let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
&plan, &plan,
self.query_engine.engine_state().catalog_manager().clone(), self.query_engine.engine_state().catalog_manager().clone(),
@@ -484,6 +505,131 @@ impl BatchingEngine {
Ok(Some(flow_id)) Ok(Some(flow_id))
} }
async fn check_is_tql_table(
&self,
query: &LogicalPlan,
query_ctx: &QueryContext,
) -> Result<(), Error> {
struct CollectTableRef {
table_refs: HashSet<datafusion_common::TableReference>,
}
impl TreeNodeVisitor<'_> for CollectTableRef {
type Node = LogicalPlan;
fn f_down(
&mut self,
node: &Self::Node,
) -> datafusion_common::Result<TreeNodeRecursion> {
if let LogicalPlan::TableScan(scan) = node {
self.table_refs.insert(scan.table_name.clone());
}
Ok(TreeNodeRecursion::Continue)
}
}
let mut table_refs = CollectTableRef {
table_refs: HashSet::new(),
};
query
.visit_with_subqueries(&mut table_refs)
.context(DatafusionSnafu {
context: "Checking if all source tables are TQL tables",
})?;
let default_catalog = query_ctx.current_catalog();
let default_schema = query_ctx.current_schema();
let default_schema = &default_schema;
for table_ref in table_refs.table_refs {
let table_ref = match &table_ref {
datafusion_common::TableReference::Bare { table } => {
TableReference::full(default_catalog, default_schema, table)
}
datafusion_common::TableReference::Partial { schema, table } => {
TableReference::full(default_catalog, schema, table)
}
datafusion_common::TableReference::Full {
catalog,
schema,
table,
} => TableReference::full(catalog, schema, table),
};
let table_id = self
.table_meta
.table_name_manager()
.get(table_ref.into())
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.with_context(|| UnexpectedSnafu {
reason: format!("Failed to get table id for table: {}", table_ref),
})?
.table_id();
let table_info =
get_table_info(self.table_meta.table_info_manager(), &table_id).await?;
// first check if it's only one f64 value column
let value_cols = table_info
.table_info
.meta
.schema
.column_schemas
.iter()
.filter(|col| col.data_type == ConcreteDataType::float64_datatype())
.collect::<Vec<_>>();
ensure!(
value_cols.len() == 1,
InvalidQuerySnafu {
reason: format!(
"TQL query only supports one f64 value column, table `{}`(id={}) has {} f64 value columns, columns are: {:?}",
table_ref,
table_id,
value_cols.len(),
value_cols
),
}
);
// TODO(discord9): do need to check rest columns is string and is tag column?
let pk_idxs = table_info
.table_info
.meta
.primary_key_indices
.iter()
.collect::<HashSet<_>>();
for (idx, col) in table_info
.table_info
.meta
.schema
.column_schemas
.iter()
.enumerate()
{
// three cases:
// 1. val column
// 2. timestamp column
// 3. tag column (string)
let is_pk: bool = pk_idxs.contains(&&idx);
ensure!(
col.data_type == ConcreteDataType::float64_datatype()
|| col.data_type.is_timestamp()
|| (col.data_type == ConcreteDataType::string_datatype() && is_pk),
InvalidQuerySnafu {
reason: format!(
"TQL query only supports f64 value column, timestamp column and string tag columns, table `{}`(id={}) has column `{}` with type {:?} which is not supported",
table_ref,
table_id,
col.name,
col.data_type
),
}
);
}
}
Ok(())
}
pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> { pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
if self.tasks.write().await.remove(&flow_id).is_none() { if self.tasks.write().await.remove(&flow_id).is_none() {
warn!("Flow {flow_id} not found in tasks"); warn!("Flow {flow_id} not found in tasks");

View File

@@ -203,11 +203,21 @@ impl DirtyTimeWindows {
self.windows.clear(); self.windows.clear();
} }
/// Set windows to be dirty, only useful for full aggr without time window
/// to mark some new data is inserted
pub fn set_dirty(&mut self) {
self.windows.insert(Timestamp::new_second(0), None);
}
/// Number of dirty windows. /// Number of dirty windows.
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
self.windows.len() self.windows.len()
} }
pub fn is_empty(&self) -> bool {
self.windows.is_empty()
}
/// Get the effective count of time windows, which is the number of time windows that can be /// Get the effective count of time windows, which is the number of time windows that can be
/// used for query, compute from total time window range divided by `window_size`. /// used for query, compute from total time window range divided by `window_size`.
pub fn effective_count(&self, window_size: &Duration) -> usize { pub fn effective_count(&self, window_size: &Duration) -> usize {

View File

@@ -48,8 +48,8 @@ use crate::batching_mode::frontend_client::FrontendClient;
use crate::batching_mode::state::{FilterExprInfo, TaskState}; use crate::batching_mode::state::{FilterExprInfo, TaskState};
use crate::batching_mode::time_window::TimeWindowExpr; use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::utils::{ use crate::batching_mode::utils::{
get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter, gen_plan_with_matching_schema, get_table_info_df_schema, sql_to_df_plan, AddFilterRewriter,
FindGroupByFinalName, ColumnMatcherRewriter, FindGroupByFinalName,
}; };
use crate::batching_mode::BatchingModeOptions; use crate::batching_mode::BatchingModeOptions;
use crate::df_optimizer::apply_df_optimizer; use crate::df_optimizer::apply_df_optimizer;
@@ -618,41 +618,62 @@ impl BatchingTask {
.map(|expr| expr.eval(low_bound)) .map(|expr| expr.eval(low_bound))
.transpose()?; .transpose()?;
let (Some((Some(l), Some(u))), QueryType::Sql) = let (expire_lower_bound, expire_upper_bound) =
(expire_time_window_bound, &self.config.query_type) match (expire_time_window_bound, &self.config.query_type) {
else { (Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
// either no time window or not a sql query, then just use the original query (None, QueryType::Sql) => {
// if it's sql query and no time window lower/upper bound is found, just return the original query(with auto columns)
// use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason // use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
debug!( debug!(
"Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id "Flow id = {:?}, no time window, using the same query",
self.config.flow_id
); );
// clean dirty time window too, this could be from create flow's check_execute // clean dirty time window too, this could be from create flow's check_execute
let is_dirty = !self.state.read().unwrap().dirty_time_windows.is_empty();
self.state.write().unwrap().dirty_time_windows.clean(); self.state.write().unwrap().dirty_time_windows.clean();
// TODO(discord9): not add auto column for tql query? if !is_dirty {
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone()); // no dirty data, hence no need to update
debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
return Ok(None);
}
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false) let plan = gen_plan_with_matching_schema(
&self.config.query,
query_ctx,
engine,
sink_table_schema.clone(),
)
.await?; .await?;
let plan = plan
.clone()
.rewrite(&mut add_auto_column)
.with_context(|_| DatafusionSnafu {
context: format!("Failed to rewrite plan:\n {}\n", plan),
})?
.data;
// since no time window lower/upper bound is found, just return the original query(with auto columns)
return Ok(Some(PlanInfo { plan, filter: None })); return Ok(Some(PlanInfo { plan, filter: None }));
}
_ => {
// clean for tql have no use for time window
self.state.write().unwrap().dirty_time_windows.clean();
let plan = gen_plan_with_matching_schema(
&self.config.query,
query_ctx,
engine,
sink_table_schema.clone(),
)
.await?;
return Ok(Some(PlanInfo { plan, filter: None }));
}
}; };
debug!( debug!(
"Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}", "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
self.config.flow_id, l, u, self.state.read().unwrap().dirty_time_windows self.config.flow_id, expire_lower_bound, expire_upper_bound, self.state.read().unwrap().dirty_time_windows
); );
let window_size = u.sub(&l).with_context(|| UnexpectedSnafu { let window_size = expire_upper_bound
reason: format!("Can't get window size from {u:?} - {l:?}"), .sub(&expire_lower_bound)
.with_context(|| UnexpectedSnafu {
reason: format!(
"Can't get window size from {expire_upper_bound:?} - {expire_lower_bound:?}"
),
})?; })?;
let col_name = self let col_name = self
.config .config
@@ -673,7 +694,7 @@ impl BatchingTask {
.dirty_time_windows .dirty_time_windows
.gen_filter_exprs( .gen_filter_exprs(
&col_name, &col_name,
Some(l), Some(expire_lower_bound),
window_size, window_size,
max_window_cnt max_window_cnt
.unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query), .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query),
@@ -701,7 +722,7 @@ impl BatchingTask {
}; };
let mut add_filter = AddFilterRewriter::new(expr.expr.clone()); let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone()); let mut add_auto_column = ColumnMatcherRewriter::new(sink_table_schema.clone());
let plan = let plan =
sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?; sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;
@@ -732,7 +753,25 @@ fn create_table_with_expr(
sink_table_name: &[String; 3], sink_table_name: &[String; 3],
query_type: &QueryType, query_type: &QueryType,
) -> Result<CreateTableExpr, Error> { ) -> Result<CreateTableExpr, Error> {
let (first_time_stamp, primary_keys) = build_primary_key_constraint(plan)?; let table_def = match query_type {
&QueryType::Sql => {
if let Some(def) = build_pk_from_aggr(plan)? {
def
} else {
build_by_sql_schema(plan)?
}
}
QueryType::Tql => {
// first try build from aggr, then from tql schema because tql query might not have aggr node
if let Some(table_def) = build_pk_from_aggr(plan)? {
table_def
} else {
build_by_tql_schema(plan)?
}
}
};
let first_time_stamp = table_def.ts_col;
let primary_keys = table_def.pks;
let mut column_schemas = Vec::new(); let mut column_schemas = Vec::new();
for field in plan.schema().fields() { for field in plan.schema().fields() {
@@ -755,7 +794,7 @@ fn create_table_with_expr(
let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name); let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name);
if is_val_column { if is_val_column {
let col_schema = let col_schema =
ColumnSchema::new("val", ConcreteDataType::float64_datatype(), true); ColumnSchema::new(name, ConcreteDataType::float64_datatype(), true);
column_schemas.push(col_schema); column_schemas.push(col_schema);
} else if is_tag_column { } else if is_tag_column {
let col_schema = let col_schema =
@@ -809,15 +848,63 @@ fn create_table_with_expr(
}) })
} }
/// simply build by schema, return first timestamp column and no primary key
fn build_by_sql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
Some(f.name().clone())
} else {
None
}
});
Ok(TableDef {
ts_col: first_time_stamp,
pks: vec![],
})
}
/// Return first timestamp column found in output schema and all string columns
fn build_by_tql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
Some(f.name().clone())
} else {
None
}
});
let string_columns = plan
.schema()
.fields()
.iter()
.filter_map(|f| {
if ConcreteDataType::from_arrow_type(f.data_type()).is_string() {
Some(f.name().clone())
} else {
None
}
})
.collect::<Vec<_>>();
Ok(TableDef {
ts_col: first_time_stamp,
pks: string_columns,
})
}
struct TableDef {
ts_col: Option<String>,
pks: Vec<String>,
}
/// Return first timestamp column which is in group by clause and other columns which are also in group by clause /// Return first timestamp column which is in group by clause and other columns which are also in group by clause
/// ///
/// # Returns /// # Returns
/// ///
/// * `Option<String>` - first timestamp column which is in group by clause /// * `Option<String>` - first timestamp column which is in group by clause
/// * `Vec<String>` - other columns which are also in group by clause /// * `Vec<String>` - other columns which are also in group by clause
fn build_primary_key_constraint( ///
plan: &LogicalPlan, /// if no aggregation found, return None
) -> Result<(Option<String>, Vec<String>), Error> { fn build_pk_from_aggr(plan: &LogicalPlan) -> Result<Option<TableDef>, Error> {
let fields = plan.schema().fields(); let fields = plan.schema().fields();
let mut pk_names = FindGroupByFinalName::default(); let mut pk_names = FindGroupByFinalName::default();
@@ -827,13 +914,18 @@ fn build_primary_key_constraint(
})?; })?;
// if no group by clause, return empty with first timestamp column found in output schema // if no group by clause, return empty with first timestamp column found in output schema
let pk_final_names = pk_names.get_group_expr_names().unwrap_or_default(); let Some(pk_final_names) = pk_names.get_group_expr_names() else {
return Ok(None);
};
if pk_final_names.is_empty() { if pk_final_names.is_empty() {
let first_ts_col = fields let first_ts_col = fields
.iter() .iter()
.find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()) .find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp())
.map(|f| f.name().clone()); .map(|f| f.name().clone());
return Ok((first_ts_col, Vec::new())); return Ok(Some(TableDef {
ts_col: first_ts_col,
pks: vec![],
}));
} }
let all_pk_cols: Vec<_> = fields let all_pk_cols: Vec<_> = fields
@@ -855,7 +947,10 @@ fn build_primary_key_constraint(
.filter(|col| first_time_stamp != Some(col.to_string())) .filter(|col| first_time_stamp != Some(col.to_string()))
.collect(); .collect();
Ok((first_time_stamp, all_pk_cols)) Ok(Some(TableDef {
ts_col: first_time_stamp,
pks: all_pk_cols,
}))
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -24,7 +24,7 @@ use datafusion::error::Result as DfResult;
use datafusion::logical_expr::Expr; use datafusion::logical_expr::Expr;
use datafusion::sql::unparser::Unparser; use datafusion::sql::unparser::Unparser;
use datafusion_common::tree_node::{ use datafusion_common::tree_node::{
Transformed, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor, Transformed, TreeNode as _, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor,
}; };
use datafusion_common::{DFSchema, DataFusionError, ScalarValue}; use datafusion_common::{DFSchema, DataFusionError, ScalarValue};
use datafusion_expr::{Distinct, LogicalPlan, Projection}; use datafusion_expr::{Distinct, LogicalPlan, Projection};
@@ -135,6 +135,27 @@ pub async fn sql_to_df_plan(
Ok(plan) Ok(plan)
} }
/// Generate a plan that matches the schema of the sink table
/// from given sql by alias and adding auto columns
pub(crate) async fn gen_plan_with_matching_schema(
sql: &str,
query_ctx: QueryContextRef,
engine: QueryEngineRef,
sink_table_schema: SchemaRef,
) -> Result<LogicalPlan, Error> {
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), sql, false).await?;
let mut add_auto_column = ColumnMatcherRewriter::new(sink_table_schema);
let plan = plan
.clone()
.rewrite(&mut add_auto_column)
.with_context(|_| DatafusionSnafu {
context: format!("Failed to rewrite plan:\n {}\n", plan),
})?
.data;
Ok(plan)
}
pub fn df_plan_to_sql(plan: &LogicalPlan) -> Result<String, Error> { pub fn df_plan_to_sql(plan: &LogicalPlan) -> Result<String, Error> {
/// A dialect that forces identifiers to be quoted when have uppercase /// A dialect that forces identifiers to be quoted when have uppercase
struct ForceQuoteIdentifiers; struct ForceQuoteIdentifiers;
@@ -239,19 +260,19 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
} }
} }
/// Add to the final select columns like `update_at` /// Optionally add to the final select columns like `update_at` if the sink table has such column
/// (which doesn't necessary need to have exact name just need to be a extra timestamp column) /// (which doesn't necessary need to have exact name just need to be a extra timestamp column)
/// and `__ts_placeholder`(this column need to have exact this name and be a timestamp) /// and `__ts_placeholder`(this column need to have exact this name and be a timestamp)
/// with values like `now()` and `0` /// with values like `now()` and `0`
/// ///
/// it also give existing columns alias to column in sink table if needed /// it also give existing columns alias to column in sink table if needed
#[derive(Debug)] #[derive(Debug)]
pub struct AddAutoColumnRewriter { pub struct ColumnMatcherRewriter {
pub schema: SchemaRef, pub schema: SchemaRef,
pub is_rewritten: bool, pub is_rewritten: bool,
} }
impl AddAutoColumnRewriter { impl ColumnMatcherRewriter {
pub fn new(schema: SchemaRef) -> Self { pub fn new(schema: SchemaRef) -> Self {
Self { Self {
schema, schema,
@@ -348,7 +369,7 @@ impl AddAutoColumnRewriter {
} }
} }
impl TreeNodeRewriter for AddAutoColumnRewriter { impl TreeNodeRewriter for ColumnMatcherRewriter {
type Node = LogicalPlan; type Node = LogicalPlan;
fn f_down(&mut self, mut node: Self::Node) -> DfResult<Transformed<Self::Node>> { fn f_down(&mut self, mut node: Self::Node) -> DfResult<Transformed<Self::Node>> {
if self.is_rewritten { if self.is_rewritten {
@@ -696,7 +717,7 @@ mod test {
let ctx = QueryContext::arc(); let ctx = QueryContext::arc();
for (before, after, column_schemas) in testcases { for (before, after, column_schemas) in testcases {
let schema = Arc::new(Schema::new(column_schemas)); let schema = Arc::new(Schema::new(column_schemas));
let mut add_auto_column_rewriter = AddAutoColumnRewriter::new(schema); let mut add_auto_column_rewriter = ColumnMatcherRewriter::new(schema);
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), before, false) let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), before, false)
.await .await

View File

@@ -302,6 +302,10 @@ impl LogStore for KafkaLogStore {
}, },
)) ))
.await?; .await?;
debug!(
"Appended batch to Kafka, region_grouped_max_offset: {:?}",
region_grouped_max_offset
);
Ok(AppendBatchResponse { Ok(AppendBatchResponse {
last_entry_ids: region_grouped_max_offset.into_iter().collect(), last_entry_ids: region_grouped_max_offset.into_iter().collect(),
@@ -362,6 +366,17 @@ impl LogStore for KafkaLogStore {
.context(GetOffsetSnafu { .context(GetOffsetSnafu {
topic: &provider.topic, topic: &provider.topic,
})?; })?;
let latest_offset = (end_offset as u64).saturating_sub(1);
self.topic_stats
.entry(provider.clone())
.and_modify(|stat| {
stat.latest_offset = stat.latest_offset.max(latest_offset);
})
.or_insert_with(|| TopicStat {
latest_offset,
record_size: 0,
record_num: 0,
});
let region_indexes = if let (Some(index), Some(collector)) = let region_indexes = if let (Some(index), Some(collector)) =
(index, self.client_manager.global_index_collector()) (index, self.client_manager.global_index_collector())
@@ -550,6 +565,7 @@ mod tests {
use futures::TryStreamExt; use futures::TryStreamExt;
use rand::prelude::SliceRandom; use rand::prelude::SliceRandom;
use rand::Rng; use rand::Rng;
use rskafka::client::partition::OffsetAt;
use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry}; use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry};
use store_api::logstore::provider::Provider; use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore; use store_api::logstore::LogStore;
@@ -713,8 +729,16 @@ mod tests {
.for_each(|entry| entry.set_entry_id(0)); .for_each(|entry| entry.set_entry_id(0));
assert_eq!(expected_entries, actual_entries); assert_eq!(expected_entries, actual_entries);
} }
let high_wathermark = logstore.latest_entry_id(&provider).unwrap(); let latest_entry_id = logstore.latest_entry_id(&provider).unwrap();
assert_eq!(high_wathermark, 99); let client = logstore
.client_manager
.get_or_insert(provider.as_kafka_provider().unwrap())
.await
.unwrap();
assert_eq!(latest_entry_id, 99);
// The latest offset is the offset of the last record plus one.
let latest = client.client().get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(latest, 100);
} }
#[tokio::test] #[tokio::test]

View File

@@ -112,11 +112,11 @@ mod tests {
let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset; let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset;
assert_eq!(current_latest_offset, 0); assert_eq!(current_latest_offset, 0);
let record = vec![record()]; let record = vec![record(), record()];
let region = RegionId::new(1, 1); let region = RegionId::new(1, 1);
producer.produce(region, record.clone()).await.unwrap(); producer.produce(region, record.clone()).await.unwrap();
tokio::time::sleep(Duration::from_millis(150)).await; tokio::time::sleep(Duration::from_millis(150)).await;
let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset; let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset;
assert_eq!(current_latest_offset, record.len() as u64); assert_eq!(current_latest_offset, record.len() as u64 - 1);
} }
} }

View File

@@ -33,9 +33,12 @@ impl BackgroundProducerWorker {
.context(error::GetOffsetSnafu { .context(error::GetOffsetSnafu {
topic: &self.provider.topic, topic: &self.provider.topic,
}) { }) {
Ok(offset) => match self.topic_stats.entry(self.provider.clone()) { Ok(highwatermark) => {
// The highwatermark is the offset of the last record plus one.
let offset = (highwatermark as u64).saturating_sub(1);
match self.topic_stats.entry(self.provider.clone()) {
dashmap::Entry::Occupied(mut occupied_entry) => { dashmap::Entry::Occupied(mut occupied_entry) => {
let offset = offset as u64;
let stat = occupied_entry.get_mut(); let stat = occupied_entry.get_mut();
if stat.latest_offset < offset { if stat.latest_offset < offset {
stat.latest_offset = offset; stat.latest_offset = offset;
@@ -47,7 +50,7 @@ impl BackgroundProducerWorker {
} }
dashmap::Entry::Vacant(vacant_entry) => { dashmap::Entry::Vacant(vacant_entry) => {
vacant_entry.insert(TopicStat { vacant_entry.insert(TopicStat {
latest_offset: offset as u64, latest_offset: offset,
record_size: 0, record_size: 0,
record_num: 0, record_num: 0,
}); });
@@ -56,7 +59,8 @@ impl BackgroundProducerWorker {
self.provider.topic, offset self.provider.topic, offset
); );
} }
}, }
}
Err(err) => { Err(err) => {
error!(err; "Failed to get latest offset for topic {}", self.provider.topic); error!(err; "Failed to get latest offset for topic {}", self.provider.topic);
} }

View File

@@ -152,13 +152,9 @@ fn align_ts(ts: i64, interval: Duration) -> i64 {
impl PersistStatsHandler { impl PersistStatsHandler {
/// Creates a new [`PersistStatsHandler`]. /// Creates a new [`PersistStatsHandler`].
pub fn new(inserter: Box<dyn Inserter>, mut persist_interval: Duration) -> Self { pub fn new(inserter: Box<dyn Inserter>, mut persist_interval: Duration) -> Self {
if persist_interval < Duration::from_secs(60) { if persist_interval < Duration::from_mins(10) {
warn!("persist_interval is less than 60 seconds, set to 60 seconds"); warn!("persist_interval is less than 10 minutes, set to 10 minutes");
persist_interval = Duration::from_secs(60); persist_interval = Duration::from_mins(10);
}
if persist_interval.as_millis() == 0 {
warn!("persist_interval as milliseconds is zero, set to 60 second");
persist_interval = Duration::from_secs(60);
} }
Self { Self {

View File

@@ -16,6 +16,7 @@
#![feature(assert_matches)] #![feature(assert_matches)]
#![feature(hash_set_entry)] #![feature(hash_set_entry)]
#![feature(let_chains)] #![feature(let_chains)]
#![feature(duration_constructors_lite)]
#![feature(duration_constructors)] #![feature(duration_constructors)]
pub mod bootstrap; pub mod bootstrap;

View File

@@ -114,8 +114,8 @@ pub struct StatsPersistenceOptions {
impl Default for StatsPersistenceOptions { impl Default for StatsPersistenceOptions {
fn default() -> Self { fn default() -> Self {
Self { Self {
ttl: Duration::from_days(30), ttl: Duration::ZERO,
interval: Duration::from_secs(60), interval: Duration::from_mins(10),
} }
} }
} }

View File

@@ -82,7 +82,7 @@ lazy_static! {
.unwrap(); .unwrap();
/// The triggered region flush total counter. /// The triggered region flush total counter.
pub static ref METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL: IntCounterVec = pub static ref METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL: IntCounterVec =
register_int_counter_vec!("meta_triggered_region_flush_total", "meta triggered region flush total", &["topic_name", "region_type"]).unwrap(); register_int_counter_vec!("meta_triggered_region_flush_total", "meta triggered region flush total", &["topic_name"]).unwrap();
/// The triggered region checkpoint total counter. /// The triggered region checkpoint total counter.
pub static ref METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL: IntCounterVec = pub static ref METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL: IntCounterVec =

View File

@@ -19,7 +19,6 @@ use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::REGION_LEASE_SECS; use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::key::datanode_table::RegionInfo; use common_meta::key::datanode_table::RegionInfo;
use common_meta::wal_options_allocator::extract_topic_from_wal_options;
use common_meta::RegionIdent; use common_meta::RegionIdent;
use common_procedure::{Context as ProcedureContext, Status}; use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info; use common_telemetry::info;
@@ -68,7 +67,6 @@ impl OpenCandidateRegion {
async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> { async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
let pc = &ctx.persistent_ctx; let pc = &ctx.persistent_ctx;
let table_id = pc.region_id.table_id(); let table_id = pc.region_id.table_id();
let region_id = pc.region_id;
let region_number = pc.region_id.region_number(); let region_number = pc.region_id.region_number();
let candidate_id = pc.to_peer.id; let candidate_id = pc.to_peer.id;
let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?; let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
@@ -80,15 +78,7 @@ impl OpenCandidateRegion {
engine, engine,
} = datanode_table_value.region_info.clone(); } = datanode_table_value.region_info.clone();
let checkpoint = let open_instruction = Instruction::OpenRegion(OpenRegion::new(
if let Some(topic) = extract_topic_from_wal_options(region_id, &region_wal_options) {
ctx.fetch_replay_checkpoint(&topic).await.ok().flatten()
} else {
None
};
let open_instruction = Instruction::OpenRegion(
OpenRegion::new(
RegionIdent { RegionIdent {
datanode_id: candidate_id, datanode_id: candidate_id,
table_id, table_id,
@@ -99,12 +89,7 @@ impl OpenCandidateRegion {
region_options, region_options,
region_wal_options, region_wal_options,
true, true,
) ));
.with_replay_entry_id(checkpoint.map(|checkpoint| checkpoint.entry_id))
.with_metadata_replay_entry_id(
checkpoint.and_then(|checkpoint| checkpoint.metadata_entry_id),
),
);
Ok(open_instruction) Ok(open_instruction)
} }
@@ -241,8 +226,6 @@ mod tests {
region_options: Default::default(), region_options: Default::default(),
region_wal_options: Default::default(), region_wal_options: Default::default(),
skip_wal_replay: true, skip_wal_replay: true,
replay_entry_id: None,
metadata_replay_entry_id: None,
}) })
} }

View File

@@ -19,6 +19,7 @@ use api::v1::meta::MailboxMessage;
use common_meta::ddl::utils::parse_region_wal_options; use common_meta::ddl::utils::parse_region_wal_options;
use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply}; use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
use common_meta::lock_key::RemoteWalLock; use common_meta::lock_key::RemoteWalLock;
use common_meta::wal_options_allocator::extract_topic_from_wal_options;
use common_procedure::{Context as ProcedureContext, Status}; use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{error, warn}; use common_telemetry::{error, warn};
use common_wal::options::WalOptions; use common_wal::options::WalOptions;
@@ -111,23 +112,40 @@ impl UpgradeCandidateRegion {
} }
/// Builds upgrade region instruction. /// Builds upgrade region instruction.
fn build_upgrade_region_instruction( async fn build_upgrade_region_instruction(
&self, &self,
ctx: &Context, ctx: &mut Context,
replay_timeout: Duration, replay_timeout: Duration,
) -> Instruction { ) -> Result<Instruction> {
let pc = &ctx.persistent_ctx; let pc = &ctx.persistent_ctx;
let region_id = pc.region_id; let region_id = pc.region_id;
let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id; let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id;
let metadata_last_entry_id = ctx.volatile_ctx.leader_region_metadata_last_entry_id; let metadata_last_entry_id = ctx.volatile_ctx.leader_region_metadata_last_entry_id;
// Try our best to retrieve replay checkpoint.
let datanode_table_value = ctx.get_from_peer_datanode_table_value().await.ok();
let checkpoint = if let Some(topic) = datanode_table_value.as_ref().and_then(|v| {
extract_topic_from_wal_options(region_id, &v.region_info.region_wal_options)
}) {
ctx.fetch_replay_checkpoint(&topic).await.ok().flatten()
} else {
None
};
Instruction::UpgradeRegion(UpgradeRegion { let upgrade_instruction = Instruction::UpgradeRegion(
UpgradeRegion {
region_id, region_id,
last_entry_id, last_entry_id,
metadata_last_entry_id, metadata_last_entry_id,
replay_timeout: Some(replay_timeout), replay_timeout: Some(replay_timeout),
location_id: Some(ctx.persistent_ctx.from_peer.id), location_id: Some(ctx.persistent_ctx.from_peer.id),
}) replay_entry_id: None,
metadata_replay_entry_id: None,
}
.with_replay_entry_id(checkpoint.map(|c| c.entry_id))
.with_metadata_replay_entry_id(checkpoint.and_then(|c| c.metadata_entry_id)),
);
Ok(upgrade_instruction)
} }
/// Tries to upgrade a candidate region. /// Tries to upgrade a candidate region.
@@ -144,16 +162,19 @@ impl UpgradeCandidateRegion {
/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply) (impossible). /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply) (impossible).
/// - [ExceededDeadline](error::Error::ExceededDeadline) /// - [ExceededDeadline](error::Error::ExceededDeadline)
/// - Invalid JSON (impossible). /// - Invalid JSON (impossible).
async fn upgrade_region(&self, ctx: &Context) -> Result<()> { async fn upgrade_region(&self, ctx: &mut Context) -> Result<()> {
let pc = &ctx.persistent_ctx;
let region_id = pc.region_id;
let candidate = &pc.to_peer;
let operation_timeout = let operation_timeout =
ctx.next_operation_timeout() ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu { .context(error::ExceededDeadlineSnafu {
operation: "Upgrade region", operation: "Upgrade region",
})?; })?;
let upgrade_instruction = self.build_upgrade_region_instruction(ctx, operation_timeout); let upgrade_instruction = self
.build_upgrade_region_instruction(ctx, operation_timeout)
.await?;
let pc = &ctx.persistent_ctx;
let region_id = pc.region_id;
let candidate = &pc.to_peer;
let msg = MailboxMessage::json_message( let msg = MailboxMessage::json_message(
&format!("Upgrade candidate region: {}", region_id), &format!("Upgrade candidate region: {}", region_id),
@@ -283,8 +304,12 @@ impl UpgradeCandidateRegion {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::assert_matches::assert_matches; use std::assert_matches::assert_matches;
use std::collections::HashMap;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer; use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use store_api::storage::RegionId; use store_api::storage::RegionId;
use super::*; use super::*;
@@ -308,14 +333,33 @@ mod tests {
} }
} }
async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
let table_info =
new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(ctx.persistent_ctx.region_id),
leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
..Default::default()
}];
ctx.table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
wal_options,
)
.await
.unwrap();
}
#[tokio::test] #[tokio::test]
async fn test_datanode_is_unreachable() { async fn test_datanode_is_unreachable() {
let state = UpgradeCandidateRegion::default(); let state = UpgradeCandidateRegion::default();
let persistent_context = new_persistent_context(); let persistent_context = new_persistent_context();
let env = TestingEnv::new(); let env = TestingEnv::new();
let ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
let err = state.upgrade_region(&ctx).await.unwrap_err(); let err = state.upgrade_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::PusherNotFound { .. }); assert_matches!(err, Error::PusherNotFound { .. });
assert!(!err.is_retryable()); assert!(!err.is_retryable());
@@ -328,7 +372,8 @@ mod tests {
let to_peer_id = persistent_context.to_peer.id; let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new(); let mut env = TestingEnv::new();
let ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context(); let mailbox_ctx = env.mailbox_context();
let (tx, rx) = tokio::sync::mpsc::channel(1); let (tx, rx) = tokio::sync::mpsc::channel(1);
@@ -339,7 +384,7 @@ mod tests {
drop(rx); drop(rx);
let err = state.upgrade_region(&ctx).await.unwrap_err(); let err = state.upgrade_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::PushMessage { .. }); assert_matches!(err, Error::PushMessage { .. });
assert!(!err.is_retryable()); assert!(!err.is_retryable());
@@ -351,10 +396,11 @@ mod tests {
let persistent_context = new_persistent_context(); let persistent_context = new_persistent_context();
let env = TestingEnv::new(); let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
ctx.volatile_ctx.metrics.operations_elapsed = ctx.volatile_ctx.metrics.operations_elapsed =
ctx.persistent_ctx.timeout + Duration::from_secs(1); ctx.persistent_ctx.timeout + Duration::from_secs(1);
let err = state.upgrade_region(&ctx).await.unwrap_err(); let err = state.upgrade_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::ExceededDeadline { .. }); assert_matches!(err, Error::ExceededDeadline { .. });
assert!(!err.is_retryable()); assert!(!err.is_retryable());
@@ -367,7 +413,8 @@ mod tests {
let to_peer_id = persistent_context.to_peer.id; let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new(); let mut env = TestingEnv::new();
let ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context(); let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone(); let mailbox = mailbox_ctx.mailbox().clone();
@@ -379,7 +426,7 @@ mod tests {
send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id))); send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
let err = state.upgrade_region(&ctx).await.unwrap_err(); let err = state.upgrade_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::UnexpectedInstructionReply { .. }); assert_matches!(err, Error::UnexpectedInstructionReply { .. });
assert!(!err.is_retryable()); assert!(!err.is_retryable());
} }
@@ -391,7 +438,8 @@ mod tests {
let to_peer_id = persistent_context.to_peer.id; let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new(); let mut env = TestingEnv::new();
let ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context(); let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone(); let mailbox = mailbox_ctx.mailbox().clone();
@@ -411,7 +459,7 @@ mod tests {
)) ))
}); });
let err = state.upgrade_region(&ctx).await.unwrap_err(); let err = state.upgrade_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::RetryLater { .. }); assert_matches!(err, Error::RetryLater { .. });
assert!(err.is_retryable()); assert!(err.is_retryable());
@@ -425,7 +473,8 @@ mod tests {
let to_peer_id = persistent_context.to_peer.id; let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new(); let mut env = TestingEnv::new();
let ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context(); let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone(); let mailbox = mailbox_ctx.mailbox().clone();
@@ -439,7 +488,7 @@ mod tests {
Ok(new_upgrade_region_reply(id, true, false, None)) Ok(new_upgrade_region_reply(id, true, false, None))
}); });
let err = state.upgrade_region(&ctx).await.unwrap_err(); let err = state.upgrade_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::Unexpected { .. }); assert_matches!(err, Error::Unexpected { .. });
assert!(!err.is_retryable()); assert!(!err.is_retryable());
@@ -457,7 +506,8 @@ mod tests {
let to_peer_id = persistent_context.to_peer.id; let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new(); let mut env = TestingEnv::new();
let ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context(); let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone(); let mailbox = mailbox_ctx.mailbox().clone();
@@ -471,7 +521,7 @@ mod tests {
Ok(new_upgrade_region_reply(id, false, true, None)) Ok(new_upgrade_region_reply(id, false, true, None))
}); });
let err = state.upgrade_region(&ctx).await.unwrap_err(); let err = state.upgrade_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::RetryLater { .. }); assert_matches!(err, Error::RetryLater { .. });
assert!(err.is_retryable()); assert!(err.is_retryable());
@@ -491,7 +541,7 @@ mod tests {
Ok(new_upgrade_region_reply(id, false, true, None)) Ok(new_upgrade_region_reply(id, false, true, None))
}); });
state.upgrade_region(&ctx).await.unwrap(); state.upgrade_region(&mut ctx).await.unwrap();
} }
#[tokio::test] #[tokio::test]
@@ -503,6 +553,7 @@ mod tests {
let mut env = TestingEnv::new(); let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context(); let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone(); let mailbox = mailbox_ctx.mailbox().clone();
@@ -563,6 +614,7 @@ mod tests {
let mut env = TestingEnv::new(); let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context(); let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone(); let mailbox = mailbox_ctx.mailbox().clone();
@@ -621,6 +673,7 @@ mod tests {
let mut env = TestingEnv::new(); let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context); let mut ctx = env.context_factory().new_context(persistent_context);
prepare_table_metadata(&ctx, HashMap::default()).await;
let mailbox_ctx = env.mailbox_context(); let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone(); let mailbox = mailbox_ctx.mailbox().clone();
ctx.volatile_ctx.metrics.operations_elapsed = ctx.volatile_ctx.metrics.operations_elapsed =

View File

@@ -29,7 +29,6 @@ use common_time::util::current_time_millis;
use common_wal::config::kafka::common::{ use common_wal::config::kafka::common::{
DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE, DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
}; };
use itertools::Itertools;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId; use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::{Receiver, Sender};
@@ -223,31 +222,34 @@ impl RegionFlushTrigger {
&self, &self,
topic: &str, topic: &str,
region_ids: &[RegionId], region_ids: &[RegionId],
topic_regions: &HashMap<RegionId, TopicRegionValue>,
leader_regions: &HashMap<RegionId, LeaderRegion>, leader_regions: &HashMap<RegionId, LeaderRegion>,
) -> Result<()> { ) -> Result<()> {
if region_ids.is_empty() {
return Ok(());
}
let regions = region_ids let regions = region_ids
.iter() .iter()
.flat_map(|region_id| match leader_regions.get(region_id) { .flat_map(|region_id| match leader_regions.get(region_id) {
Some(leader_region) => { Some(leader_region) => should_persist_region_checkpoint(
let entry_id = leader_region.manifest.replay_entry_id(); leader_region,
let metadata_entry_id = leader_region.manifest.metadata_replay_entry_id(); topic_regions
.get(region_id)
Some(( .cloned()
.and_then(|value| value.checkpoint),
)
.map(|checkpoint| {
(
TopicRegionKey::new(*region_id, topic), TopicRegionKey::new(*region_id, topic),
Some(TopicRegionValue::new(Some(ReplayCheckpoint::new( Some(TopicRegionValue::new(Some(checkpoint))),
entry_id, )
metadata_entry_id, }),
)))),
))
}
None => None, None => None,
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// The`chunks` will panic if chunks_size is zero, so we return early if there are no regions to persist.
if regions.is_empty() {
return Ok(());
}
let max_txn_ops = self.table_metadata_manager.kv_backend().max_txn_ops(); let max_txn_ops = self.table_metadata_manager.kv_backend().max_txn_ops();
let batch_size = max_txn_ops.min(regions.len()); let batch_size = max_txn_ops.min(regions.len());
for batch in regions.chunks(batch_size) { for batch in regions.chunks(batch_size) {
@@ -271,14 +273,14 @@ impl RegionFlushTrigger {
latest_entry_id: u64, latest_entry_id: u64,
avg_record_size: usize, avg_record_size: usize,
) -> Result<()> { ) -> Result<()> {
let region_ids = self let topic_regions = self
.table_metadata_manager .table_metadata_manager
.topic_region_manager() .topic_region_manager()
.regions(topic) .regions(topic)
.await .await
.context(error::TableMetadataManagerSnafu)?; .context(error::TableMetadataManagerSnafu)?;
if region_ids.is_empty() { if topic_regions.is_empty() {
debug!("No regions found for topic: {}", topic); debug!("No regions found for topic: {}", topic);
return Ok(()); return Ok(());
} }
@@ -286,7 +288,7 @@ impl RegionFlushTrigger {
// Filters regions need to persist checkpoints. // Filters regions need to persist checkpoints.
let regions_to_persist = filter_regions_by_replay_size( let regions_to_persist = filter_regions_by_replay_size(
topic, topic,
region_ids topic_regions
.iter() .iter()
.map(|(region_id, value)| (*region_id, value.min_entry_id().unwrap_or_default())), .map(|(region_id, value)| (*region_id, value.min_entry_id().unwrap_or_default())),
avg_record_size as u64, avg_record_size as u64,
@@ -295,33 +297,25 @@ impl RegionFlushTrigger {
); );
let region_manifests = self let region_manifests = self
.leader_region_registry .leader_region_registry
.batch_get(region_ids.keys().cloned()); .batch_get(topic_regions.keys().cloned());
if let Err(err) = self if let Err(err) = self
.persist_region_checkpoints(topic, &regions_to_persist, &region_manifests) .persist_region_checkpoints(
topic,
&regions_to_persist,
&topic_regions,
&region_manifests,
)
.await .await
{ {
error!(err; "Failed to persist region checkpoints for topic: {}", topic); error!(err; "Failed to persist region checkpoints for topic: {}", topic);
} }
let (inactive_regions, active_regions): (Vec<_>, Vec<_>) = region_manifests let regions = region_manifests
.into_iter() .into_iter()
.partition_map(|(region_id, region)| { .map(|(region_id, region)| (region_id, region.manifest.prunable_entry_id()))
if !region.manifest.is_inactive() { .collect::<Vec<_>>();
itertools::Either::Left((region_id, region.manifest.prunable_entry_id())) let min_entry_id = regions.iter().min_by_key(|(_, entry_id)| *entry_id);
} else {
itertools::Either::Right((region_id, region.manifest.prunable_entry_id()))
}
});
let min_entry_id = inactive_regions
.iter()
.min_by_key(|(_, entry_id)| *entry_id);
let min_entry_id = active_regions
.iter()
.min_by_key(|(_, entry_id)| *entry_id)
.or(min_entry_id);
if let Some((_, min_entry_id)) = min_entry_id { if let Some((_, min_entry_id)) = min_entry_id {
let replay_size = (latest_entry_id.saturating_sub(*min_entry_id)) let replay_size = (latest_entry_id.saturating_sub(*min_entry_id))
.saturating_mul(avg_record_size as u64); .saturating_mul(avg_record_size as u64);
@@ -331,45 +325,28 @@ impl RegionFlushTrigger {
} }
// Selects regions to flush from the set of active regions. // Selects regions to flush from the set of active regions.
let mut regions_to_flush = filter_regions_by_replay_size( let regions_to_flush = filter_regions_by_replay_size(
topic, topic,
active_regions.into_iter(), regions.into_iter(),
avg_record_size as u64, avg_record_size as u64,
latest_entry_id, latest_entry_id,
self.flush_trigger_size, self.flush_trigger_size,
); );
let active_regions_num = regions_to_flush.len();
// Selects regions to flush from the set of inactive regions.
// For inactive regions, we use a lower flush trigger size (half of the normal size)
// to encourage more aggressive flushing to update the region's topic latest entry id.
let inactive_regions_to_flush = filter_regions_by_replay_size(
topic,
inactive_regions.into_iter(),
avg_record_size as u64,
latest_entry_id,
self.flush_trigger_size / 2,
);
let inactive_regions_num = inactive_regions_to_flush.len();
regions_to_flush.extend(inactive_regions_to_flush);
// Sends flush instructions to datanodes. // Sends flush instructions to datanodes.
if !regions_to_flush.is_empty() { if !regions_to_flush.is_empty() {
self.send_flush_instructions(&regions_to_flush).await?; self.send_flush_instructions(&regions_to_flush).await?;
debug!( debug!(
"Sent {} flush instructions to datanodes for topic: '{}' ({} inactive regions)", "Sent {} flush instructions to datanodes for topic: '{}', regions: {:?}",
regions_to_flush.len(), regions_to_flush.len(),
topic, topic,
inactive_regions_num, regions_to_flush,
); );
} }
metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL
.with_label_values(&[topic, "active"]) .with_label_values(&[topic])
.inc_by(active_regions_num as u64); .inc_by(regions_to_flush.len() as u64);
metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL
.with_label_values(&[topic, "inactive"])
.inc_by(inactive_regions_num as u64);
Ok(()) Ok(())
} }
@@ -408,6 +385,26 @@ impl RegionFlushTrigger {
} }
} }
/// Determines whether a region checkpoint should be persisted based on current and persisted state.
fn should_persist_region_checkpoint(
current: &LeaderRegion,
persisted: Option<ReplayCheckpoint>,
) -> Option<ReplayCheckpoint> {
let new_checkpoint = ReplayCheckpoint::new(
current.manifest.replay_entry_id(),
current.manifest.metadata_replay_entry_id(),
);
let Some(persisted) = persisted else {
return Some(new_checkpoint);
};
if new_checkpoint > persisted {
return Some(new_checkpoint);
}
None
}
/// Filter regions based on the estimated replay size. /// Filter regions based on the estimated replay size.
/// ///
/// Returns the regions if its estimated replay size exceeds the given threshold. /// Returns the regions if its estimated replay size exceeds the given threshold.
@@ -496,6 +493,7 @@ fn is_recent(timestamp: i64, now: i64, duration: Duration) -> bool {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use common_base::readable_size::ReadableSize; use common_base::readable_size::ReadableSize;
use common_meta::region_registry::LeaderRegionManifestInfo;
use store_api::storage::RegionId; use store_api::storage::RegionId;
use super::*; use super::*;
@@ -626,4 +624,92 @@ mod tests {
// Only regions 1,1 and 1,2 should be flushed // Only regions 1,1 and 1,2 should be flushed
assert_eq!(result, vec![region_id(1, 1), region_id(1, 2)]); assert_eq!(result, vec![region_id(1, 1), region_id(1, 2)]);
} }
fn metric_leader_region(replay_entry_id: u64, metadata_replay_entry_id: u64) -> LeaderRegion {
LeaderRegion {
datanode_id: 1,
manifest: LeaderRegionManifestInfo::Metric {
data_manifest_version: 1,
data_flushed_entry_id: replay_entry_id,
data_topic_latest_entry_id: 0,
metadata_manifest_version: 1,
metadata_flushed_entry_id: metadata_replay_entry_id,
metadata_topic_latest_entry_id: 0,
},
}
}
fn mito_leader_region(replay_entry_id: u64) -> LeaderRegion {
LeaderRegion {
datanode_id: 1,
manifest: LeaderRegionManifestInfo::Mito {
manifest_version: 1,
flushed_entry_id: replay_entry_id,
topic_latest_entry_id: 0,
},
}
}
#[test]
fn test_should_persist_region_checkpoint() {
// `persisted` is none
let current = metric_leader_region(100, 10);
let result = should_persist_region_checkpoint(&current, None).unwrap();
assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
// `persisted.entry_id` is less than `current.manifest.replay_entry_id()`
let current = mito_leader_region(100);
let result =
should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, None)))
.unwrap();
assert_eq!(result, ReplayCheckpoint::new(100, None));
let current = metric_leader_region(100, 10);
let result =
should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, Some(10))))
.unwrap();
assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
// `persisted.metadata_entry_id` is less than `current.manifest.metadata_replay_entry_id()`
let current = metric_leader_region(100, 10);
let result =
should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, Some(8))))
.unwrap();
assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
// `persisted.metadata_entry_id` is none
let current = metric_leader_region(100, 10);
let result =
should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, None)))
.unwrap();
assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
// `current.manifest.metadata_replay_entry_id()` is none
let current = mito_leader_region(100);
let result =
should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, Some(8))))
.is_none();
assert!(result);
// `persisted.entry_id` is equal to `current.manifest.replay_entry_id()`
let current = metric_leader_region(100, 10);
let result =
should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, Some(10))));
assert!(result.is_none());
let current = mito_leader_region(100);
let result =
should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, None)));
assert!(result.is_none());
// `persisted.entry_id` is less than `current.manifest.replay_entry_id()`
// `persisted.metadata_entry_id` is greater than `current.manifest.metadata_replay_entry_id()`
let current = metric_leader_region(80, 11);
let result =
should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, Some(10))));
assert!(result.is_none());
let current = mito_leader_region(80);
let result =
should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, Some(10))));
assert!(result.is_none());
}
} }

View File

@@ -97,7 +97,7 @@ impl store_server::Store for Metasrv {
let req = req.into_inner(); let req = req.into_inner();
let _timer = METRIC_META_KV_REQUEST_ELAPSED let _timer = METRIC_META_KV_REQUEST_ELAPSED
.with_label_values(&[self.kv_backend().name(), "batch_pub"]) .with_label_values(&[self.kv_backend().name(), "batch_put"])
.start_timer(); .start_timer();
let req: BatchPutRequest = req.into(); let req: BatchPutRequest = req.into();

View File

@@ -15,7 +15,9 @@
use common_telemetry::debug; use common_telemetry::debug;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use store_api::region_engine::RegionEngine; use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionCatchupRequest, RegionRequest}; use store_api::region_request::{
AffectedRows, RegionCatchupRequest, RegionRequest, ReplayCheckpoint,
};
use store_api::storage::RegionId; use store_api::storage::RegionId;
use crate::engine::MetricEngineInner; use crate::engine::MetricEngineInner;
@@ -59,6 +61,10 @@ impl MetricEngineInner {
entry_id: req.metadata_entry_id, entry_id: req.metadata_entry_id,
metadata_entry_id: None, metadata_entry_id: None,
location_id: req.location_id, location_id: req.location_id,
checkpoint: req.checkpoint.map(|c| ReplayCheckpoint {
entry_id: c.metadata_entry_id.unwrap_or_default(),
metadata_entry_id: None,
}),
}), }),
) )
.await .await
@@ -73,6 +79,10 @@ impl MetricEngineInner {
entry_id: req.entry_id, entry_id: req.entry_id,
metadata_entry_id: None, metadata_entry_id: None,
location_id: req.location_id, location_id: req.location_id,
checkpoint: req.checkpoint.map(|c| ReplayCheckpoint {
entry_id: c.entry_id,
metadata_entry_id: None,
}),
}), }),
) )
.await .await

View File

@@ -189,6 +189,11 @@ impl AccessLayer {
&self.puffin_manager_factory &self.puffin_manager_factory
} }
/// Returns the intermediate manager.
pub fn intermediate_manager(&self) -> &IntermediateManager {
&self.intermediate_manager
}
/// Deletes a SST file (and its index file if it has one) with given file id. /// Deletes a SST file (and its index file if it has one) with given file id.
pub(crate) async fn delete_sst(&self, file_meta: &FileMeta) -> Result<()> { pub(crate) async fn delete_sst(&self, file_meta: &FileMeta) -> Result<()> {
let path = location::sst_file_path(&self.table_dir, file_meta.file_id(), self.path_type); let path = location::sst_file_path(&self.table_dir, file_meta.file_id(), self.path_type);

View File

@@ -1077,6 +1077,7 @@ mod tests {
let staging_manifest_ctx = { let staging_manifest_ctx = {
let manager = RegionManifestManager::new( let manager = RegionManifestManager::new(
version_control.current().version.metadata.clone(), version_control.current().version.metadata.clone(),
0,
RegionManifestOptions { RegionManifestOptions {
manifest_dir: "".to_string(), manifest_dir: "".to_string(),
object_store: env.access_layer.object_store().clone(), object_store: env.access_layer.object_store().clone(),

View File

@@ -127,8 +127,7 @@ async fn test_catchup_with_last_entry_id(factory: Option<LogStoreFactory>) {
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false, set_writable: false,
entry_id: last_entry_id, entry_id: last_entry_id,
metadata_entry_id: None, ..Default::default()
location_id: None,
}), }),
) )
.await; .await;
@@ -160,8 +159,7 @@ async fn test_catchup_with_last_entry_id(factory: Option<LogStoreFactory>) {
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true, set_writable: true,
entry_id: last_entry_id, entry_id: last_entry_id,
metadata_entry_id: None, ..Default::default()
location_id: None,
}), }),
) )
.await; .await;
@@ -251,8 +249,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option<LogStoreFacto
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false, set_writable: false,
entry_id: incorrect_last_entry_id, entry_id: incorrect_last_entry_id,
metadata_entry_id: None, ..Default::default()
location_id: None,
}), }),
) )
.await .await
@@ -269,8 +266,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option<LogStoreFacto
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false, set_writable: false,
entry_id: incorrect_last_entry_id, entry_id: incorrect_last_entry_id,
metadata_entry_id: None, ..Default::default()
location_id: None,
}), }),
) )
.await; .await;
@@ -340,9 +336,7 @@ async fn test_catchup_without_last_entry_id(factory: Option<LogStoreFactory>) {
region_id, region_id,
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false, set_writable: false,
entry_id: None, ..Default::default()
metadata_entry_id: None,
location_id: None,
}), }),
) )
.await; .await;
@@ -372,9 +366,7 @@ async fn test_catchup_without_last_entry_id(factory: Option<LogStoreFactory>) {
region_id, region_id,
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true, set_writable: true,
entry_id: None, ..Default::default()
metadata_entry_id: None,
location_id: None,
}), }),
) )
.await; .await;
@@ -465,9 +457,7 @@ async fn test_catchup_with_manifest_update(factory: Option<LogStoreFactory>) {
region_id, region_id,
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false, set_writable: false,
entry_id: None, ..Default::default()
metadata_entry_id: None,
location_id: None,
}), }),
) )
.await; .await;
@@ -503,9 +493,7 @@ async fn test_catchup_with_manifest_update(factory: Option<LogStoreFactory>) {
region_id, region_id,
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true, set_writable: true,
entry_id: None, ..Default::default()
metadata_entry_id: None,
location_id: None,
}), }),
) )
.await; .await;
@@ -652,9 +640,7 @@ async fn test_local_catchup(factory: Option<LogStoreFactory>) {
region_id, region_id,
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true, set_writable: true,
entry_id: None, ..Default::default()
metadata_entry_id: None,
location_id: None,
}), }),
) )
.await; .await;
@@ -715,9 +701,7 @@ async fn test_catchup_not_exist() {
non_exist_region_id, non_exist_region_id,
RegionRequest::Catchup(RegionCatchupRequest { RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true, set_writable: true,
entry_id: None, ..Default::default()
metadata_entry_id: None,
location_id: None,
}), }),
) )
.await .await

View File

@@ -27,8 +27,8 @@ use crate::error::{
self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result, self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
}; };
use crate::manifest::action::{ use crate::manifest::action::{
RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction, RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
RegionMetaActionList, RegionMetaAction, RegionMetaActionList,
}; };
use crate::manifest::checkpointer::Checkpointer; use crate::manifest::checkpointer::Checkpointer;
use crate::manifest::storage::{ use crate::manifest::storage::{
@@ -150,6 +150,7 @@ impl RegionManifestManager {
/// Constructs a region's manifest and persist it. /// Constructs a region's manifest and persist it.
pub async fn new( pub async fn new(
metadata: RegionMetadataRef, metadata: RegionMetadataRef,
flushed_entry_id: u64,
options: RegionManifestOptions, options: RegionManifestOptions,
total_manifest_size: Arc<AtomicU64>, total_manifest_size: Arc<AtomicU64>,
manifest_version: Arc<AtomicU64>, manifest_version: Arc<AtomicU64>,
@@ -163,8 +164,8 @@ impl RegionManifestManager {
); );
info!( info!(
"Creating region manifest in {} with metadata {:?}", "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
options.manifest_dir, metadata options.manifest_dir, metadata, flushed_entry_id
); );
let version = MIN_VERSION; let version = MIN_VERSION;
@@ -184,9 +185,21 @@ impl RegionManifestManager {
options.manifest_dir, manifest options.manifest_dir, manifest
); );
let mut actions = vec![RegionMetaAction::Change(RegionChange { metadata })];
if flushed_entry_id > 0 {
actions.push(RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![],
files_to_remove: vec![],
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: Some(flushed_entry_id),
flushed_sequence: None,
}));
}
// Persist region change. // Persist region change.
let action_list = let action_list = RegionMetaActionList::new(actions);
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata }));
// New region is not in staging mode. // New region is not in staging mode.
// TODO(ruihang): add staging mode support if needed. // TODO(ruihang): add staging mode support if needed.
store.save(version, &action_list.encode()?, false).await?; store.save(version, &action_list.encode()?, false).await?;

View File

@@ -1122,6 +1122,7 @@ mod tests {
let staging_ctx = { let staging_ctx = {
let manager = RegionManifestManager::new( let manager = RegionManifestManager::new(
version_control.current().version.metadata.clone(), version_control.current().version.metadata.clone(),
0,
RegionManifestOptions { RegionManifestOptions {
manifest_dir: "".to_string(), manifest_dir: "".to_string(),
object_store: env.access_layer.object_store().clone(), object_store: env.access_layer.object_store().clone(),
@@ -1187,6 +1188,7 @@ mod tests {
let manager = RegionManifestManager::new( let manager = RegionManifestManager::new(
metadata.clone(), metadata.clone(),
0,
RegionManifestOptions { RegionManifestOptions {
manifest_dir: "".to_string(), manifest_dir: "".to_string(),
object_store: access_layer.object_store().clone(), object_store: access_layer.object_store().clone(),

View File

@@ -238,8 +238,11 @@ impl RegionOpener {
// Create a manifest manager for this region and writes regions to the manifest file. // Create a manifest manager for this region and writes regions to the manifest file.
let region_manifest_options = let region_manifest_options =
Self::manifest_options(config, &options, &region_dir, &self.object_store_manager)?; Self::manifest_options(config, &options, &region_dir, &self.object_store_manager)?;
// For remote WAL, we need to set flushed_entry_id to current topic's latest entry id.
let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
let manifest_manager = RegionManifestManager::new( let manifest_manager = RegionManifestManager::new(
metadata.clone(), metadata.clone(),
flushed_entry_id,
region_manifest_options, region_manifest_options,
self.stats.total_manifest_size.clone(), self.stats.total_manifest_size.clone(),
self.stats.manifest_version.clone(), self.stats.manifest_version.clone(),
@@ -439,7 +442,7 @@ impl RegionOpener {
.build(); .build();
let flushed_entry_id = version.flushed_entry_id; let flushed_entry_id = version.flushed_entry_id;
let version_control = Arc::new(VersionControl::new(version)); let version_control = Arc::new(VersionControl::new(version));
if !self.skip_wal_replay { let topic_latest_entry_id = if !self.skip_wal_replay {
let replay_from_entry_id = self let replay_from_entry_id = self
.replay_checkpoint .replay_checkpoint
.unwrap_or_default() .unwrap_or_default()
@@ -461,14 +464,26 @@ impl RegionOpener {
on_region_opened, on_region_opened,
) )
.await?; .await?;
// For remote WAL, we need to set topic_latest_entry_id to current topic's latest entry id.
// Only set after the WAL replay is completed.
let topic_latest_entry_id = if provider.is_remote_wal()
&& version_control.current().version.memtables.is_empty()
{
wal.store().latest_entry_id(&provider).unwrap_or(0)
} else {
0
};
topic_latest_entry_id
} else { } else {
info!( info!(
"Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}", "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}",
region_id, manifest.manifest_version, flushed_entry_id region_id, manifest.manifest_version, flushed_entry_id
); );
}
let now = self.time_provider.current_time_millis();
0
};
let now = self.time_provider.current_time_millis();
let region = MitoRegion { let region = MitoRegion {
region_id: self.region_id, region_id: self.region_id,
version_control, version_control,
@@ -483,7 +498,7 @@ impl RegionOpener {
last_flush_millis: AtomicI64::new(now), last_flush_millis: AtomicI64::new(now),
last_compaction_millis: AtomicI64::new(now), last_compaction_millis: AtomicI64::new(now),
time_provider: self.time_provider.clone(), time_provider: self.time_provider.clone(),
topic_latest_entry_id: AtomicU64::new(0), topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
write_bytes: Arc::new(AtomicU64::new(0)), write_bytes: Arc::new(AtomicU64::new(0)),
memtable_builder, memtable_builder,
stats: self.stats.clone(), stats: self.stats.clone(),
@@ -713,8 +728,8 @@ where
let series_count = version_control.current().series_count(); let series_count = version_control.current().series_count();
info!( info!(
"Replay WAL for region: {}, rows recovered: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}", "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
region_id, rows_replayed, last_entry_id, series_count, now.elapsed() region_id, provider, rows_replayed, replay_from_entry_id, last_entry_id, series_count, now.elapsed()
); );
Ok(last_entry_id) Ok(last_entry_id)
} }

View File

@@ -371,7 +371,7 @@ impl VersionBuilder {
self self
} }
/// Sets truncated entty id. /// Sets truncated entry id.
pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self { pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
self.truncated_entry_id = entry_id; self.truncated_entry_id = entry_id;
self self

View File

@@ -137,6 +137,14 @@ impl FilePurger for LocalFilePurger {
error!(e; "Failed to purge stager with index file, file_id: {}, region: {}", error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
file_meta.file_id(), file_meta.region_id); file_meta.file_id(), file_meta.region_id);
} }
let file_id = file_meta.file_id();
if let Err(e) = sst_layer
.intermediate_manager()
.prune_sst_dir(&file_id.region_id(), &file_id.file_id())
.await
{
error!(e; "Failed to prune intermediate sst directory, region_id: {}, file_id: {}", file_id.region_id(), file_id.file_id());
}
})) { })) {
error!(e; "Failed to schedule the file purge request"); error!(e; "Failed to schedule the file purge request");
} }

View File

@@ -110,6 +110,7 @@ pub struct Indexer {
last_mem_fulltext_index: usize, last_mem_fulltext_index: usize,
bloom_filter_indexer: Option<BloomFilterIndexer>, bloom_filter_indexer: Option<BloomFilterIndexer>,
last_mem_bloom_filter: usize, last_mem_bloom_filter: usize,
intermediate_manager: Option<IntermediateManager>,
} }
impl Indexer { impl Indexer {
@@ -196,6 +197,7 @@ impl IndexerBuilder for IndexerBuilderImpl {
indexer.inverted_indexer = self.build_inverted_indexer(file_id); indexer.inverted_indexer = self.build_inverted_indexer(file_id);
indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await; indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id); indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
indexer.intermediate_manager = Some(self.intermediate_manager.clone());
if indexer.inverted_indexer.is_none() if indexer.inverted_indexer.is_none()
&& indexer.fulltext_indexer.is_none() && indexer.fulltext_indexer.is_none()
&& indexer.bloom_filter_indexer.is_none() && indexer.bloom_filter_indexer.is_none()

View File

@@ -21,6 +21,7 @@ impl Indexer {
self.do_abort_inverted_index().await; self.do_abort_inverted_index().await;
self.do_abort_fulltext_index().await; self.do_abort_fulltext_index().await;
self.do_abort_bloom_filter().await; self.do_abort_bloom_filter().await;
self.do_prune_intm_sst_dir().await;
self.puffin_manager = None; self.puffin_manager = None;
} }

View File

@@ -54,6 +54,7 @@ impl Indexer {
return IndexOutput::default(); return IndexOutput::default();
} }
self.do_prune_intm_sst_dir().await;
output.file_size = self.do_finish_puffin_writer(writer).await; output.file_size = self.do_finish_puffin_writer(writer).await;
output output
} }
@@ -270,4 +271,12 @@ impl Indexer {
output.row_count = row_count; output.row_count = row_count;
output.columns = column_ids; output.columns = column_ids;
} }
pub(crate) async fn do_prune_intm_sst_dir(&mut self) {
if let Some(manager) = self.intermediate_manager.take() {
if let Err(e) = manager.prune_sst_dir(&self.region_id, &self.file_id).await {
warn!(e; "Failed to prune intermediate SST directory, region_id: {}, file_id: {}", self.region_id, self.file_id);
}
}
}
} }

View File

@@ -54,14 +54,22 @@ impl IntermediateManager {
aux_path.as_ref() aux_path.as_ref()
); );
// Remove the intermediate directory on bankground
let aux_pb = PathBuf::from(aux_path.as_ref());
let intm_dir = aux_pb.join(INTERMEDIATE_DIR);
let deleted_dir = intm_dir.with_extension(format!("deleted-{}", Uuid::new_v4()));
if let Err(err) = tokio::fs::rename(&intm_dir, &deleted_dir).await {
warn!(err; "Failed to rename intermediate directory");
}
tokio::spawn(async move {
if let Err(err) = tokio::fs::remove_dir_all(deleted_dir).await {
warn!(err; "Failed to remove intermediate directory");
}
});
let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?; let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?;
let store = InstrumentedStore::new(store); let store = InstrumentedStore::new(store);
// Remove all garbage intermediate files from previous runs.
if let Err(err) = store.remove_all(INTERMEDIATE_DIR).await {
warn!(err; "Failed to remove garbage intermediate files");
}
Ok(Self { Ok(Self {
base_dir: PathBuf::from(aux_path.as_ref()), base_dir: PathBuf::from(aux_path.as_ref()),
store, store,
@@ -94,6 +102,24 @@ impl IntermediateManager {
.join(sst_file_id.to_string()) .join(sst_file_id.to_string())
.join(format!("fulltext-{column_id}-{uuid}")) .join(format!("fulltext-{column_id}-{uuid}"))
} }
/// Prunes the intermediate directory for SST files.
pub(crate) async fn prune_sst_dir(
&self,
region_id: &RegionId,
sst_file_id: &FileId,
) -> Result<()> {
let region_id = region_id.as_u64();
let sst_dir = format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/");
self.store.remove_all(&sst_dir).await
}
/// Prunes the intermediate directory for region files.
pub(crate) async fn prune_region_dir(&self, region_id: &RegionId) -> Result<()> {
let region_id = region_id.as_u64();
let region_dir = format!("{INTERMEDIATE_DIR}/{region_id}/");
self.store.remove_all(&region_dir).await
}
} }
/// `IntermediateLocation` produces paths for intermediate files /// `IntermediateLocation` produces paths for intermediate files
@@ -268,6 +294,60 @@ mod tests {
.unwrap()); .unwrap());
} }
#[tokio::test]
async fn test_cleanup_dir() {
let temp_dir = temp_dir::create_temp_dir("test_cleanup_dir_");
let region_id = RegionId::new(0, 0);
let sst_file_id = FileId::random();
let region_dir = temp_dir
.path()
.join(INTERMEDIATE_DIR)
.join(region_id.as_u64().to_string());
let sst_dir = region_dir.join(sst_file_id.to_string());
let path = temp_dir.path().to_str().unwrap();
let manager = IntermediateManager::init_fs(path).await.unwrap();
let location = IntermediateLocation::new(&region_id, &sst_file_id);
let temp_file_provider = TempFileProvider::new(location, manager.clone());
let mut f1 = temp_file_provider
.create("sky", "000000000000")
.await
.unwrap();
f1.write_all(b"hello").await.unwrap();
f1.flush().await.unwrap();
f1.close().await.unwrap();
let mut f2 = temp_file_provider
.create("sky", "000000000001")
.await
.unwrap();
f2.write_all(b"world").await.unwrap();
f2.flush().await.unwrap();
f2.close().await.unwrap();
temp_file_provider.cleanup().await.unwrap();
// sst_dir and region_dir still exists
assert!(tokio::fs::try_exists(&sst_dir).await.unwrap());
assert!(tokio::fs::try_exists(&region_dir).await.unwrap());
// sst_dir should be deleted, region_dir still exists
manager
.prune_sst_dir(&region_id, &sst_file_id)
.await
.unwrap();
assert!(tokio::fs::try_exists(&region_dir).await.unwrap());
assert!(!tokio::fs::try_exists(&sst_dir).await.unwrap());
// sst_dir, region_dir should be deleted
manager.prune_region_dir(&region_id).await.unwrap();
assert!(!tokio::fs::try_exists(&sst_dir).await.unwrap());
assert!(!tokio::fs::try_exists(&region_dir).await.unwrap());
}
#[test] #[test]
fn test_intermediate_location() { fn test_intermediate_location() {
let sst_file_id = FileId::random(); let sst_file_id = FileId::random();

View File

@@ -563,6 +563,7 @@ impl TestEnv {
if let Some(metadata) = initial_metadata { if let Some(metadata) = initial_metadata {
RegionManifestManager::new( RegionManifestManager::new(
metadata, metadata,
0,
manifest_opts, manifest_opts,
Default::default(), Default::default(),
Default::default(), Default::default(),

View File

@@ -116,6 +116,7 @@ impl SchedulerEnv {
Arc::new(ManifestContext::new( Arc::new(ManifestContext::new(
RegionManifestManager::new( RegionManifestManager::new(
metadata, metadata,
0,
RegionManifestOptions { RegionManifestOptions {
manifest_dir: "".to_string(), manifest_dir: "".to_string(),
object_store: self.access_layer.object_store().clone(), object_store: self.access_layer.object_store().clone(),

View File

@@ -65,7 +65,12 @@ impl<S: LogStore> RegionWorkerLoop<S> {
if region.provider.is_remote_wal() { if region.provider.is_remote_wal() {
let flushed_entry_id = region.version_control.current().last_entry_id; let flushed_entry_id = region.version_control.current().last_entry_id;
info!("Trying to replay memtable for region: {region_id}, flushed entry id: {flushed_entry_id}"); let replay_from_entry_id = request
.checkpoint
.map(|c| c.entry_id)
.unwrap_or_default()
.max(flushed_entry_id);
info!("Trying to replay memtable for region: {region_id}, provider: {:?}, replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}", region.provider);
let timer = Instant::now(); let timer = Instant::now();
let wal_entry_reader = let wal_entry_reader =
self.wal self.wal
@@ -75,15 +80,16 @@ impl<S: LogStore> RegionWorkerLoop<S> {
&region.provider, &region.provider,
wal_entry_reader, wal_entry_reader,
region_id, region_id,
flushed_entry_id, replay_from_entry_id,
&region.version_control, &region.version_control,
self.config.allow_stale_entries, self.config.allow_stale_entries,
on_region_opened, on_region_opened,
) )
.await?; .await?;
info!( info!(
"Elapsed: {:?}, region: {region_id} catchup finished. last entry id: {last_entry_id}, expected: {:?}.", "Elapsed: {:?}, region: {region_id}, provider: {:?} catchup finished. replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}, last entry id: {last_entry_id}, expected: {:?}.",
timer.elapsed(), timer.elapsed(),
region.provider,
request.entry_id request.entry_id
); );
if let Some(expected_last_entry_id) = request.entry_id { if let Some(expected_last_entry_id) = request.entry_id {

View File

@@ -99,6 +99,7 @@ where
let object_store = region.access_layer.object_store().clone(); let object_store = region.access_layer.object_store().clone();
let dropping_regions = self.dropping_regions.clone(); let dropping_regions = self.dropping_regions.clone();
let listener = self.listener.clone(); let listener = self.listener.clone();
let intm_manager = self.intermediate_manager.clone();
common_runtime::spawn_global(async move { common_runtime::spawn_global(async move {
let gc_duration = listener let gc_duration = listener
.on_later_drop_begin(region_id) .on_later_drop_begin(region_id)
@@ -111,6 +112,9 @@ where
gc_duration, gc_duration,
) )
.await; .await;
if let Err(err) = intm_manager.prune_region_dir(&region_id).await {
warn!(err; "Failed to prune intermediate region directory, region_id: {}", region_id);
}
listener.on_later_drop_end(region_id, removed); listener.on_later_drop_end(region_id, removed);
}); });

View File

@@ -369,6 +369,9 @@ impl<H> BoundedStager<H> {
/// Note: It can't recover the mapping between puffin files and keys, so TTL /// Note: It can't recover the mapping between puffin files and keys, so TTL
/// is configured to purge the dangling files and directories. /// is configured to purge the dangling files and directories.
async fn recover(&self) -> Result<()> { async fn recover(&self) -> Result<()> {
let timer = std::time::Instant::now();
info!("Recovering the staging area, base_dir: {:?}", self.base_dir);
let mut read_dir = fs::read_dir(&self.base_dir).await.context(ReadSnafu)?; let mut read_dir = fs::read_dir(&self.base_dir).await.context(ReadSnafu)?;
let mut elems = HashMap::new(); let mut elems = HashMap::new();
@@ -430,6 +433,7 @@ impl<H> BoundedStager<H> {
} }
let mut size = 0; let mut size = 0;
let num_elems = elems.len();
for (key, value) in elems { for (key, value) in elems {
size += value.size(); size += value.size();
self.cache.insert(key, value).await; self.cache.insert(key, value).await;
@@ -440,6 +444,12 @@ impl<H> BoundedStager<H> {
self.cache.run_pending_tasks().await; self.cache.run_pending_tasks().await;
info!(
"Recovered the staging area, num_entries: {}, num_bytes: {}, cost: {:?}",
num_elems,
size,
timer.elapsed()
);
Ok(()) Ok(())
} }

View File

@@ -15,6 +15,7 @@
use std::fmt::Display; use std::fmt::Display;
use std::sync::Arc; use std::sync::Arc;
use crate::logstore::LogStore;
use crate::storage::RegionId; use crate::storage::RegionId;
// The Provider of kafka log store // The Provider of kafka log store
@@ -78,6 +79,18 @@ impl Display for Provider {
} }
impl Provider { impl Provider {
/// Returns the initial flushed entry id of the provider.
/// This is used to initialize the flushed entry id of the region when creating the region from scratch.
///
/// Currently only used for remote WAL.
/// For local WAL, the initial flushed entry id is 0.
pub fn initial_flushed_entry_id<S: LogStore>(&self, wal: &S) -> u64 {
if matches!(self, Provider::Kafka(_)) {
return wal.latest_entry_id(self).unwrap_or(0);
}
0
}
pub fn raft_engine_provider(id: u64) -> Provider { pub fn raft_engine_provider(id: u64) -> Provider {
Provider::RaftEngine(RaftEngineProvider { id }) Provider::RaftEngine(RaftEngineProvider { id })
} }

View File

@@ -1358,7 +1358,7 @@ pub enum RegionTruncateRequest {
/// ///
/// Makes a readonly region to catch up to leader region changes. /// Makes a readonly region to catch up to leader region changes.
/// There is no effect if it operating on a leader region. /// There is no effect if it operating on a leader region.
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy, Default)]
pub struct RegionCatchupRequest { pub struct RegionCatchupRequest {
/// Sets it to writable if it's available after it has caught up with all changes. /// Sets it to writable if it's available after it has caught up with all changes.
pub set_writable: bool, pub set_writable: bool,
@@ -1371,6 +1371,8 @@ pub struct RegionCatchupRequest {
pub metadata_entry_id: Option<entry::Id>, pub metadata_entry_id: Option<entry::Id>,
/// The hint for replaying memtable. /// The hint for replaying memtable.
pub location_id: Option<u64>, pub location_id: Option<u64>,
/// Replay checkpoint.
pub checkpoint: Option<ReplayCheckpoint>,
} }
/// Get sequences of regions by region ids. /// Get sequences of regions by region ids.

View File

@@ -15,11 +15,11 @@ Affected Rows: 0
SHOW CREATE TABLE cnt_reqs; SHOW CREATE TABLE cnt_reqs;
+----------+-----------------------------------------+ +----------+-------------------------------------------+
| Table | Create Table | | Table | Create Table |
+----------+-----------------------------------------+ +----------+-------------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | | cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "val" DOUBLE NULL, | | | "count(http_requests.val)" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, | | | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" STRING NULL, | | | "status_code" STRING NULL, |
| | TIME INDEX ("ts"), | | | TIME INDEX ("ts"), |
@@ -28,7 +28,13 @@ SHOW CREATE TABLE cnt_reqs;
| | | | | |
| | ENGINE=mito | | | ENGINE=mito |
| | | | | |
+----------+-----------------------------------------+ +----------+-------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
++
++
INSERT INTO TABLE http_requests VALUES INSERT INTO TABLE http_requests VALUES
(now() - '17s'::interval, 'host1', 'idc1', 200), (now() - '17s'::interval, 'host1', 'idc1', 200),
@@ -80,6 +86,43 @@ DROP TABLE cnt_reqs;
Affected Rows: 0 Affected Rows: 0
CREATE TABLE http_requests_two_vals (
ts timestamp(3) time index,
host STRING,
idc STRING,
val DOUBLE,
valb DOUBLE,
PRIMARY KEY(host, idc),
);
Affected Rows: 0
-- should failed with two value columns error
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests_two_vals);
Error: 3001(EngineExecuteQuery), Unsupported expr type: count_values on multi-value input
-- should failed with two value columns error
-- SQLNESS REPLACE id=[0-9]+ id=[REDACTED]
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '5s') rate(http_requests_two_vals[5m]);
Error: 3001(EngineExecuteQuery), Invalid query: TQL query only supports one f64 value column, table `greptime.public.http_requests_two_vals`(id=[REDACTED]) has 2 f64 value columns, columns are: [val Float64 null, valb Float64 null]
SHOW TABLES;
+------------------------+
| Tables |
+------------------------+
| http_requests_two_vals |
| numbers |
+------------------------+
DROP TABLE http_requests_two_vals;
Affected Rows: 0
CREATE TABLE http_requests ( CREATE TABLE http_requests (
ts timestamp(3) time index, ts timestamp(3) time index,
host STRING, host STRING,
@@ -114,11 +157,11 @@ Affected Rows: 0
SHOW CREATE TABLE cnt_reqs; SHOW CREATE TABLE cnt_reqs;
+----------+-----------------------------------------+ +----------+-------------------------------------------+
| Table | Create Table | | Table | Create Table |
+----------+-----------------------------------------+ +----------+-------------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | | cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "val" DOUBLE NULL, | | | "count(http_requests.val)" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, | | | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" STRING NULL, | | | "status_code" STRING NULL, |
| | TIME INDEX ("ts"), | | | TIME INDEX ("ts"), |
@@ -127,7 +170,13 @@ SHOW CREATE TABLE cnt_reqs;
| | | | | |
| | ENGINE=mito | | | ENGINE=mito |
| | | | | |
+----------+-----------------------------------------+ +----------+-------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
++
++
INSERT INTO TABLE http_requests VALUES INSERT INTO TABLE http_requests VALUES
(0::Timestamp, 'host1', 'idc1', 200), (0::Timestamp, 'host1', 'idc1', 200),
@@ -158,11 +207,11 @@ ADMIN FLUSH_FLOW('calc_reqs');
| FLOW_FLUSHED | | FLOW_FLUSHED |
+-------------------------------+ +-------------------------------+
SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code; SELECT * FROM cnt_reqs ORDER BY ts, status_code;
+-----+---------------------+-------------+ +--------------------------+---------------------+-------------+
| val | ts | status_code | | count(http_requests.val) | ts | status_code |
+-----+---------------------+-------------+ +--------------------------+---------------------+-------------+
| 3.0 | 1970-01-01T00:00:00 | 200.0 | | 3.0 | 1970-01-01T00:00:00 | 200.0 |
| 1.0 | 1970-01-01T00:00:00 | 401.0 | | 1.0 | 1970-01-01T00:00:00 | 401.0 |
| 1.0 | 1970-01-01T00:00:05 | 401.0 | | 1.0 | 1970-01-01T00:00:05 | 401.0 |
@@ -171,7 +220,7 @@ SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code;
| 2.0 | 1970-01-01T00:00:10 | 200.0 | | 2.0 | 1970-01-01T00:00:10 | 200.0 |
| 2.0 | 1970-01-01T00:00:10 | 201.0 | | 2.0 | 1970-01-01T00:00:10 | 201.0 |
| 4.0 | 1970-01-01T00:00:15 | 500.0 | | 4.0 | 1970-01-01T00:00:15 | 500.0 |
+-----+---------------------+-------------+ +--------------------------+---------------------+-------------+
DROP FLOW calc_reqs; DROP FLOW calc_reqs;
@@ -199,18 +248,24 @@ Affected Rows: 0
SHOW CREATE TABLE rate_reqs; SHOW CREATE TABLE rate_reqs;
+-----------+------------------------------------------+ +-----------+-----------------------------------------------------------+
| Table | Create Table | | Table | Create Table |
+-----------+------------------------------------------+ +-----------+-----------------------------------------------------------+
| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( | | rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( |
| | "ts" TIMESTAMP(3) NOT NULL, | | | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, | | | "prom_rate(ts_range,val,ts,Int64(300000))" DOUBLE NULL, |
| | TIME INDEX ("ts") | | | TIME INDEX ("ts") |
| | ) | | | ) |
| | | | | |
| | ENGINE=mito | | | ENGINE=mito |
| | | | | |
+-----------+------------------------------------------+ +-----------+-----------------------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs);
++
++
INSERT INTO TABLE http_requests VALUES INSERT INTO TABLE http_requests VALUES
(now() - '1m'::interval, 0), (now() - '1m'::interval, 0),
@@ -248,3 +303,84 @@ DROP TABLE rate_reqs;
Affected Rows: 0 Affected Rows: 0
CREATE TABLE http_requests_total (
host STRING,
job STRING,
instance STRING,
byte DOUBLE,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY (host, job, instance)
);
Affected Rows: 0
CREATE FLOW calc_rate
SINK TO rate_reqs
EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests_total{job="my_service"}[1m]);
Affected Rows: 0
SHOW CREATE TABLE rate_reqs;
+-----------+-----------------------------------------------------------+
| Table | Create Table |
+-----------+-----------------------------------------------------------+
| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "prom_rate(ts_range,byte,ts,Int64(60000))" DOUBLE NULL, |
| | "host" STRING NULL, |
| | "job" STRING NULL, |
| | "instance" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("host", "job", "instance") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-----------+-----------------------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs);
++
++
INSERT INTO TABLE http_requests_total VALUES
('localhost', 'my_service', 'instance1', 100, now() - '1min'::interval),
('localhost', 'my_service', 'instance1', 200, now() - '45s'::interval),
('remotehost', 'my_service', 'instance1', 300, now() - '30s'::interval),
('remotehost', 'their_service', 'instance1', 300, now() - '15s'::interval),
('localhost', 'my_service', 'instance1', 400, now());
Affected Rows: 5
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_rate');
+-------------------------------+
| ADMIN FLUSH_FLOW('calc_rate') |
+-------------------------------+
| FLOW_FLUSHED |
+-------------------------------+
SELECT count(*)>0 FROM rate_reqs;
+---------------------+
| count(*) > Int64(0) |
+---------------------+
| true |
+---------------------+
DROP FLOW calc_rate;
Affected Rows: 0
DROP TABLE http_requests_total;
Affected Rows: 0
DROP TABLE rate_reqs;
Affected Rows: 0

View File

@@ -22,15 +22,17 @@ INSERT INTO test VALUES
Affected Rows: 3 Affected Rows: 3
-- SQLNESS SLEEP 3s -- SQLNESS SLEEP 3s
SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size) -- For regions using different WAL implementations, the manifest size may vary.
-- The remote WAL implementation additionally stores a flushed entry ID when creating the manifest.
SELECT SUM(region_rows), SUM(memtable_size), SUM(sst_size), SUM(index_size)
FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id
IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public'); IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public');
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ +-------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
| sum(information_schema.region_statistics.region_rows) | sum(information_schema.region_statistics.disk_size) | sum(information_schema.region_statistics.sst_size) | sum(information_schema.region_statistics.index_size) | | sum(information_schema.region_statistics.region_rows) | sum(information_schema.region_statistics.memtable_size) | sum(information_schema.region_statistics.sst_size) | sum(information_schema.region_statistics.index_size) |
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ +-------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
| 3 | 2699 | 0 | 0 | | 3 | 78 | 0 | 0 |
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ +-------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test'; SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test';

View File

@@ -17,7 +17,9 @@ INSERT INTO test VALUES
(21, 'c', 21); (21, 'c', 21);
-- SQLNESS SLEEP 3s -- SQLNESS SLEEP 3s
SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size) -- For regions using different WAL implementations, the manifest size may vary.
-- The remote WAL implementation additionally stores a flushed entry ID when creating the manifest.
SELECT SUM(region_rows), SUM(memtable_size), SUM(sst_size), SUM(index_size)
FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id
IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public'); IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public');

View File

@@ -15,11 +15,11 @@ Affected Rows: 0
SHOW CREATE TABLE cnt_reqs; SHOW CREATE TABLE cnt_reqs;
+----------+-----------------------------------------+ +----------+-------------------------------------------+
| Table | Create Table | | Table | Create Table |
+----------+-----------------------------------------+ +----------+-------------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | | cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "val" DOUBLE NULL, | | | "count(http_requests.val)" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, | | | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" STRING NULL, | | | "status_code" STRING NULL, |
| | TIME INDEX ("ts"), | | | TIME INDEX ("ts"), |
@@ -28,7 +28,13 @@ SHOW CREATE TABLE cnt_reqs;
| | | | | |
| | ENGINE=mito | | | ENGINE=mito |
| | | | | |
+----------+-----------------------------------------+ +----------+-------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
++
++
INSERT INTO TABLE http_requests VALUES INSERT INTO TABLE http_requests VALUES
(now() - '17s'::interval, 'host1', 'idc1', 200), (now() - '17s'::interval, 'host1', 'idc1', 200),
@@ -80,6 +86,43 @@ DROP TABLE cnt_reqs;
Affected Rows: 0 Affected Rows: 0
CREATE TABLE http_requests_two_vals (
ts timestamp(3) time index,
host STRING,
idc STRING,
val DOUBLE,
valb DOUBLE,
PRIMARY KEY(host, idc),
);
Affected Rows: 0
-- should failed with two value columns error
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests_two_vals);
Error: 3001(EngineExecuteQuery), Unsupported expr type: count_values on multi-value input
-- should failed with two value columns error
-- SQLNESS REPLACE id=[0-9]+ id=[REDACTED]
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '5s') rate(http_requests_two_vals[5m]);
Error: 3001(EngineExecuteQuery), Invalid query: TQL query only supports one f64 value column, table `greptime.public.http_requests_two_vals`(id=[REDACTED]) has 2 f64 value columns, columns are: [val Float64 null, valb Float64 null]
SHOW TABLES;
+------------------------+
| Tables |
+------------------------+
| http_requests_two_vals |
| numbers |
+------------------------+
DROP TABLE http_requests_two_vals;
Affected Rows: 0
CREATE TABLE http_requests ( CREATE TABLE http_requests (
ts timestamp(3) time index, ts timestamp(3) time index,
host STRING, host STRING,
@@ -114,11 +157,11 @@ Affected Rows: 0
SHOW CREATE TABLE cnt_reqs; SHOW CREATE TABLE cnt_reqs;
+----------+-----------------------------------------+ +----------+-------------------------------------------+
| Table | Create Table | | Table | Create Table |
+----------+-----------------------------------------+ +----------+-------------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | | cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "val" DOUBLE NULL, | | | "count(http_requests.val)" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, | | | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" STRING NULL, | | | "status_code" STRING NULL, |
| | TIME INDEX ("ts"), | | | TIME INDEX ("ts"), |
@@ -127,7 +170,13 @@ SHOW CREATE TABLE cnt_reqs;
| | | | | |
| | ENGINE=mito | | | ENGINE=mito |
| | | | | |
+----------+-----------------------------------------+ +----------+-------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
++
++
INSERT INTO TABLE http_requests VALUES INSERT INTO TABLE http_requests VALUES
(0::Timestamp, 'host1', 'idc1', 200), (0::Timestamp, 'host1', 'idc1', 200),
@@ -158,11 +207,11 @@ ADMIN FLUSH_FLOW('calc_reqs');
| FLOW_FLUSHED | | FLOW_FLUSHED |
+-------------------------------+ +-------------------------------+
SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code; SELECT * FROM cnt_reqs ORDER BY ts, status_code;
+-----+---------------------+-------------+ +--------------------------+---------------------+-------------+
| val | ts | status_code | | count(http_requests.val) | ts | status_code |
+-----+---------------------+-------------+ +--------------------------+---------------------+-------------+
| 3.0 | 1970-01-01T00:00:00 | 200.0 | | 3.0 | 1970-01-01T00:00:00 | 200.0 |
| 1.0 | 1970-01-01T00:00:00 | 401.0 | | 1.0 | 1970-01-01T00:00:00 | 401.0 |
| 1.0 | 1970-01-01T00:00:05 | 401.0 | | 1.0 | 1970-01-01T00:00:05 | 401.0 |
@@ -171,7 +220,7 @@ SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code;
| 2.0 | 1970-01-01T00:00:10 | 200.0 | | 2.0 | 1970-01-01T00:00:10 | 200.0 |
| 2.0 | 1970-01-01T00:00:10 | 201.0 | | 2.0 | 1970-01-01T00:00:10 | 201.0 |
| 4.0 | 1970-01-01T00:00:15 | 500.0 | | 4.0 | 1970-01-01T00:00:15 | 500.0 |
+-----+---------------------+-------------+ +--------------------------+---------------------+-------------+
DROP FLOW calc_reqs; DROP FLOW calc_reqs;
@@ -199,18 +248,24 @@ Affected Rows: 0
SHOW CREATE TABLE rate_reqs; SHOW CREATE TABLE rate_reqs;
+-----------+------------------------------------------+ +-----------+-----------------------------------------------------------+
| Table | Create Table | | Table | Create Table |
+-----------+------------------------------------------+ +-----------+-----------------------------------------------------------+
| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( | | rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( |
| | "ts" TIMESTAMP(3) NOT NULL, | | | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, | | | "prom_rate(ts_range,val,ts,Int64(300000))" DOUBLE NULL, |
| | TIME INDEX ("ts") | | | TIME INDEX ("ts") |
| | ) | | | ) |
| | | | | |
| | ENGINE=mito | | | ENGINE=mito |
| | | | | |
+-----------+------------------------------------------+ +-----------+-----------------------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs);
++
++
INSERT INTO TABLE http_requests VALUES INSERT INTO TABLE http_requests VALUES
(now() - '1m'::interval, 0), (now() - '1m'::interval, 0),
@@ -248,3 +303,84 @@ DROP TABLE rate_reqs;
Affected Rows: 0 Affected Rows: 0
CREATE TABLE http_requests_total (
host STRING,
job STRING,
instance STRING,
byte DOUBLE,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY (host, job, instance)
);
Affected Rows: 0
CREATE FLOW calc_rate
SINK TO rate_reqs
EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests_total{job="my_service"}[1m]);
Affected Rows: 0
SHOW CREATE TABLE rate_reqs;
+-----------+-----------------------------------------------------------+
| Table | Create Table |
+-----------+-----------------------------------------------------------+
| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "prom_rate(ts_range,byte,ts,Int64(60000))" DOUBLE NULL, |
| | "host" STRING NULL, |
| | "job" STRING NULL, |
| | "instance" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("host", "job", "instance") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-----------+-----------------------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs);
++
++
INSERT INTO TABLE http_requests_total VALUES
('localhost', 'my_service', 'instance1', 100, now() - '1min'::interval),
('localhost', 'my_service', 'instance1', 200, now() - '45s'::interval),
('remotehost', 'my_service', 'instance1', 300, now() - '30s'::interval),
('remotehost', 'their_service', 'instance1', 300, now() - '15s'::interval),
('localhost', 'my_service', 'instance1', 400, now());
Affected Rows: 5
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_rate');
+-------------------------------+
| ADMIN FLUSH_FLOW('calc_rate') |
+-------------------------------+
| FLOW_FLUSHED |
+-------------------------------+
SELECT count(*)>0 FROM rate_reqs;
+---------------------+
| count(*) > Int64(0) |
+---------------------+
| true |
+---------------------+
DROP FLOW calc_rate;
Affected Rows: 0
DROP TABLE http_requests_total;
Affected Rows: 0
DROP TABLE rate_reqs;
Affected Rows: 0

View File

@@ -11,6 +11,9 @@ TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_
SHOW CREATE TABLE cnt_reqs; SHOW CREATE TABLE cnt_reqs;
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
INSERT INTO TABLE http_requests VALUES INSERT INTO TABLE http_requests VALUES
(now() - '17s'::interval, 'host1', 'idc1', 200), (now() - '17s'::interval, 'host1', 'idc1', 200),
(now() - '17s'::interval, 'host2', 'idc1', 200), (now() - '17s'::interval, 'host2', 'idc1', 200),
@@ -39,6 +42,28 @@ DROP FLOW calc_reqs;
DROP TABLE http_requests; DROP TABLE http_requests;
DROP TABLE cnt_reqs; DROP TABLE cnt_reqs;
CREATE TABLE http_requests_two_vals (
ts timestamp(3) time index,
host STRING,
idc STRING,
val DOUBLE,
valb DOUBLE,
PRIMARY KEY(host, idc),
);
-- should failed with two value columns error
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests_two_vals);
-- should failed with two value columns error
-- SQLNESS REPLACE id=[0-9]+ id=[REDACTED]
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '5s') rate(http_requests_two_vals[5m]);
SHOW TABLES;
DROP TABLE http_requests_two_vals;
CREATE TABLE http_requests ( CREATE TABLE http_requests (
ts timestamp(3) time index, ts timestamp(3) time index,
host STRING, host STRING,
@@ -64,6 +89,9 @@ TQL EVAL (now() - now(), now()-(now()-'15s'::interval), '5s') count_values("stat
SHOW CREATE TABLE cnt_reqs; SHOW CREATE TABLE cnt_reqs;
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
INSERT INTO TABLE http_requests VALUES INSERT INTO TABLE http_requests VALUES
(0::Timestamp, 'host1', 'idc1', 200), (0::Timestamp, 'host1', 'idc1', 200),
(0::Timestamp, 'host2', 'idc1', 200), (0::Timestamp, 'host2', 'idc1', 200),
@@ -85,7 +113,7 @@ INSERT INTO TABLE http_requests VALUES
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_reqs'); ADMIN FLUSH_FLOW('calc_reqs');
SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code; SELECT * FROM cnt_reqs ORDER BY ts, status_code;
DROP FLOW calc_reqs; DROP FLOW calc_reqs;
DROP TABLE http_requests; DROP TABLE http_requests;
@@ -101,6 +129,9 @@ TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests[5m]);
SHOW CREATE TABLE rate_reqs; SHOW CREATE TABLE rate_reqs;
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs);
INSERT INTO TABLE http_requests VALUES INSERT INTO TABLE http_requests VALUES
(now() - '1m'::interval, 0), (now() - '1m'::interval, 0),
(now() - '30s'::interval, 1), (now() - '30s'::interval, 1),
@@ -114,3 +145,38 @@ SELECT count(*) > 0 FROM rate_reqs;
DROP FLOW calc_rate; DROP FLOW calc_rate;
DROP TABLE http_requests; DROP TABLE http_requests;
DROP TABLE rate_reqs; DROP TABLE rate_reqs;
CREATE TABLE http_requests_total (
host STRING,
job STRING,
instance STRING,
byte DOUBLE,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY (host, job, instance)
);
CREATE FLOW calc_rate
SINK TO rate_reqs
EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests_total{job="my_service"}[1m]);
SHOW CREATE TABLE rate_reqs;
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs);
INSERT INTO TABLE http_requests_total VALUES
('localhost', 'my_service', 'instance1', 100, now() - '1min'::interval),
('localhost', 'my_service', 'instance1', 200, now() - '45s'::interval),
('remotehost', 'my_service', 'instance1', 300, now() - '30s'::interval),
('remotehost', 'their_service', 'instance1', 300, now() - '15s'::interval),
('localhost', 'my_service', 'instance1', 400, now());
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_rate');
SELECT count(*)>0 FROM rate_reqs;
DROP FLOW calc_rate;
DROP TABLE http_requests_total;
DROP TABLE rate_reqs;