Add rayon
This commit is contained in:
parent
465fcfba0a
commit
9454aa3c68
4
.gitignore
vendored
4
.gitignore
vendored
@ -7,7 +7,3 @@ web/dist
|
|||||||
web/tmp
|
web/tmp
|
||||||
adapters
|
adapters
|
||||||
plugins
|
plugins
|
||||||
!qwdata/.gitkeep
|
|
||||||
qwdata
|
|
||||||
!grafana-storage/plugins
|
|
||||||
grafana-storage
|
|
||||||
|
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -7229,6 +7229,7 @@ dependencies = [
|
|||||||
"futures 0.3.28",
|
"futures 0.3.28",
|
||||||
"plugin-api",
|
"plugin-api",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
|
"rayon",
|
||||||
"reqwest 0.11.18",
|
"reqwest 0.11.18",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
@ -18,3 +18,4 @@ reqwest = { version = "0.11.18", features = ['json', 'serde_json', 'stream', 'ru
|
|||||||
toml = { version = "0.7.4" }
|
toml = { version = "0.7.4" }
|
||||||
tracing = { version = "0.1.37" }
|
tracing = { version = "0.1.37" }
|
||||||
anyhow = { version = "1.0.71" }
|
anyhow = { version = "1.0.71" }
|
||||||
|
rayon = { version = "1.7.0" }
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
use actix_web::http::StatusCode;
|
|
||||||
use actix_web::{get, App, Error, HttpResponse, HttpServer};
|
use actix_web::{get, App, Error, HttpResponse, HttpServer};
|
||||||
|
use futures::prelude::*;
|
||||||
use plugin_api::{AppConfig, Plugin};
|
use plugin_api::{AppConfig, Plugin};
|
||||||
use rand::prelude::SliceRandom;
|
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use serde::*;
|
use serde::*;
|
||||||
use telemetry_plugin::TelemetryPlugin;
|
use telemetry_plugin::TelemetryPlugin;
|
||||||
@ -16,7 +15,7 @@ async fn main() -> std::io::Result<()> {
|
|||||||
let mut t = toml::Table::with_capacity(3);
|
let mut t = toml::Table::with_capacity(3);
|
||||||
t.insert(
|
t.insert(
|
||||||
"endpoint".to_string(),
|
"endpoint".to_string(),
|
||||||
toml::Value::String("http://localhost:16686".to_string()),
|
toml::Value::String("http://0.0.0.0:16686".to_string()),
|
||||||
);
|
);
|
||||||
t.insert(
|
t.insert(
|
||||||
"log_level".to_string(),
|
"log_level".to_string(),
|
||||||
@ -49,19 +48,19 @@ const BASE_API_URL: &'static str = "https://jsonplaceholder.typicode.com";
|
|||||||
#[get("/posts")]
|
#[get("/posts")]
|
||||||
async fn get_posts() -> Result<HttpResponse, Error> {
|
async fn get_posts() -> Result<HttpResponse, Error> {
|
||||||
// Randomly simulate errors in request handling
|
// Randomly simulate errors in request handling
|
||||||
let choices = [200, 400, 401, 200, 500, 501, 200, 500];
|
// let choices = [200, 400, 401, 200, 500, 501, 200, 500];
|
||||||
let mut rng = rand::thread_rng();
|
// let mut rng = rand::thread_rng();
|
||||||
let choice = choices.choose(&mut rng).unwrap().clone();
|
// let choice = choices.choose(&mut rng).unwrap().clone();
|
||||||
match choice {
|
// match choice {
|
||||||
400..=401 => Ok(HttpResponse::new(StatusCode::from_u16(choice).unwrap())),
|
// 400..=401 => Ok(HttpResponse::new(StatusCode::from_u16(choice).unwrap())),
|
||||||
500..=501 => Ok(HttpResponse::new(StatusCode::from_u16(choice).unwrap())),
|
// 500..=501 => Ok(HttpResponse::new(StatusCode::from_u16(choice).unwrap())),
|
||||||
_ => {
|
// _ => {
|
||||||
let posts = fetch_posts(20)
|
let posts = fetch_posts(20)
|
||||||
.await
|
.await
|
||||||
.map_err(actix_web::error::ErrorInternalServerError)?;
|
.map_err(actix_web::error::ErrorInternalServerError)?;
|
||||||
Ok(HttpResponse::Ok().json(posts))
|
Ok(HttpResponse::Ok().json(posts))
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetching posts with a limit.
|
// Fetching posts with a limit.
|
||||||
@ -77,11 +76,23 @@ async fn fetch_posts(limit: usize) -> anyhow::Result<Vec<Post>> {
|
|||||||
.map(|(idx, post)| (idx, post.id))
|
.map(|(idx, post)| (idx, post.id))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// fetch post comments one after another.
|
let mut res = futures::stream::iter(post_idx_to_ids.iter())
|
||||||
for (index, post_id) in post_idx_to_ids {
|
.map(|(idx, post_id)| {
|
||||||
let comments = fetch_comments(&client, post_id).await?;
|
let client = client.clone();
|
||||||
posts[index].comments = comments
|
async move {
|
||||||
|
let comments = fetch_comments(&client, *post_id).await.unwrap();
|
||||||
|
(*idx, comments)
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
.buffer_unordered(3);
|
||||||
|
|
||||||
|
for (idx, comments) in res.next().await {
|
||||||
|
posts[idx].comments = comments;
|
||||||
|
}
|
||||||
|
// fetch post comments one after another.
|
||||||
|
// for (index, post_id) in post_idx_to_ids {
|
||||||
|
//
|
||||||
|
// }
|
||||||
|
|
||||||
Ok(posts)
|
Ok(posts)
|
||||||
}
|
}
|
||||||
@ -89,8 +100,7 @@ async fn fetch_posts(limit: usize) -> anyhow::Result<Vec<Post>> {
|
|||||||
#[instrument(level = "info", name = "fetch_comments")]
|
#[instrument(level = "info", name = "fetch_comments")]
|
||||||
async fn fetch_comments(client: &Client, id: i64) -> anyhow::Result<Vec<Comment>> {
|
async fn fetch_comments(client: &Client, id: i64) -> anyhow::Result<Vec<Comment>> {
|
||||||
let url = format!("{BASE_API_URL}/posts/{id}/comments");
|
let url = format!("{BASE_API_URL}/posts/{id}/comments");
|
||||||
let mut comments: Vec<Comment> = client.get(url).send().await?.json().await?;
|
Ok(client.get(url).send().await?.json().await?)
|
||||||
Ok(comments)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
version: '3'
|
version: '3'
|
||||||
services:
|
services:
|
||||||
quickwit:
|
quickwit:
|
||||||
image: quickwit/quickwit:latest
|
image: quickwit/quickwit:v0.5.2
|
||||||
command: run
|
command: run
|
||||||
restart: always
|
restart: always
|
||||||
environment:
|
environment:
|
||||||
@ -10,11 +10,11 @@ services:
|
|||||||
ports:
|
ports:
|
||||||
- '7280:7280'
|
- '7280:7280'
|
||||||
- '7281:7281'
|
- '7281:7281'
|
||||||
volumes:
|
# volumes:
|
||||||
- ./qwdata:/quickwit/qwdata
|
# - ./qwdata:/quickwit/qwdata
|
||||||
|
|
||||||
jaeger:
|
jaeger:
|
||||||
image: jaegertracing/jaeger-query:latest
|
image: jaegertracing/jaeger-query:1.45
|
||||||
restart: always
|
restart: always
|
||||||
depends_on:
|
depends_on:
|
||||||
- quickwit
|
- quickwit
|
||||||
@ -25,9 +25,8 @@ services:
|
|||||||
- '16686:16686'
|
- '16686:16686'
|
||||||
|
|
||||||
grafana:
|
grafana:
|
||||||
image: grafana/grafana-enterprise:latest
|
image: grafana/grafana-enterprise:10.0.0
|
||||||
restart: always
|
restart: always
|
||||||
user: root
|
|
||||||
depends_on:
|
depends_on:
|
||||||
- quickwit
|
- quickwit
|
||||||
environment:
|
environment:
|
||||||
@ -35,4 +34,4 @@ services:
|
|||||||
ports:
|
ports:
|
||||||
- '3000:3000'
|
- '3000:3000'
|
||||||
volumes:
|
volumes:
|
||||||
- ./grafana-storage:/var/lib/grafana
|
- ./grafana/plugins:/var/lib/grafana/plugins
|
||||||
|
@ -1,53 +0,0 @@
|
|||||||
|
|
||||||
{{ define "__subject" }}[{{ .Status | toUpper }}{{ if eq .Status "firing" }}:{{ .Alerts.Firing | len }}{{ if gt (.Alerts.Resolved | len) 0 }}, RESOLVED:{{ .Alerts.Resolved | len }}{{ end }}{{ end }}] {{ .GroupLabels.SortedPairs.Values | join " " }} {{ if gt (len .CommonLabels) (len .GroupLabels) }}({{ with .CommonLabels.Remove .GroupLabels.Names }}{{ .Values | join " " }}{{ end }}){{ end }}{{ end }}
|
|
||||||
|
|
||||||
{{ define "__text_values_list" }}{{ if len .Values }}{{ $first := true }}{{ range $refID, $value := .Values -}}
|
|
||||||
{{ if $first }}{{ $first = false }}{{ else }}, {{ end }}{{ $refID }}={{ $value }}{{ end -}}
|
|
||||||
{{ else }}[no value]{{ end }}{{ end }}
|
|
||||||
|
|
||||||
{{ define "__text_alert_list" }}{{ range . }}
|
|
||||||
Value: {{ template "__text_values_list" . }}
|
|
||||||
Labels:
|
|
||||||
{{ range .Labels.SortedPairs }} - {{ .Name }} = {{ .Value }}
|
|
||||||
{{ end }}Annotations:
|
|
||||||
{{ range .Annotations.SortedPairs }} - {{ .Name }} = {{ .Value }}
|
|
||||||
{{ end }}{{ if gt (len .GeneratorURL) 0 }}Source: {{ .GeneratorURL }}
|
|
||||||
{{ end }}{{ if gt (len .SilenceURL) 0 }}Silence: {{ .SilenceURL }}
|
|
||||||
{{ end }}{{ if gt (len .DashboardURL) 0 }}Dashboard: {{ .DashboardURL }}
|
|
||||||
{{ end }}{{ if gt (len .PanelURL) 0 }}Panel: {{ .PanelURL }}
|
|
||||||
{{ end }}{{ end }}{{ end }}
|
|
||||||
|
|
||||||
{{ define "default.title" }}{{ template "__subject" . }}{{ end }}
|
|
||||||
|
|
||||||
{{ define "default.message" }}{{ if gt (len .Alerts.Firing) 0 }}**Firing**
|
|
||||||
{{ template "__text_alert_list" .Alerts.Firing }}{{ if gt (len .Alerts.Resolved) 0 }}
|
|
||||||
|
|
||||||
{{ end }}{{ end }}{{ if gt (len .Alerts.Resolved) 0 }}**Resolved**
|
|
||||||
{{ template "__text_alert_list" .Alerts.Resolved }}{{ end }}{{ end }}
|
|
||||||
|
|
||||||
|
|
||||||
{{ define "__teams_text_alert_list" }}{{ range . }}
|
|
||||||
Value: {{ template "__text_values_list" . }}
|
|
||||||
Labels:
|
|
||||||
{{ range .Labels.SortedPairs }} - {{ .Name }} = {{ .Value }}
|
|
||||||
{{ end }}
|
|
||||||
Annotations:
|
|
||||||
{{ range .Annotations.SortedPairs }} - {{ .Name }} = {{ .Value }}
|
|
||||||
{{ end }}
|
|
||||||
{{ if gt (len .GeneratorURL) 0 }}Source: [{{ .GeneratorURL }}]({{ .GeneratorURL }})
|
|
||||||
|
|
||||||
{{ end }}{{ if gt (len .SilenceURL) 0 }}Silence: [{{ .SilenceURL }}]({{ .SilenceURL }})
|
|
||||||
|
|
||||||
{{ end }}{{ if gt (len .DashboardURL) 0 }}Dashboard: [{{ .DashboardURL }}]({{ .DashboardURL }})
|
|
||||||
|
|
||||||
{{ end }}{{ if gt (len .PanelURL) 0 }}Panel: [{{ .PanelURL }}]({{ .PanelURL }})
|
|
||||||
|
|
||||||
{{ end }}
|
|
||||||
{{ end }}{{ end }}
|
|
||||||
|
|
||||||
|
|
||||||
{{ define "teams.default.message" }}{{ if gt (len .Alerts.Firing) 0 }}**Firing**
|
|
||||||
{{ template "__teams_text_alert_list" .Alerts.Firing }}{{ if gt (len .Alerts.Resolved) 0 }}
|
|
||||||
|
|
||||||
{{ end }}{{ end }}{{ if gt (len .Alerts.Resolved) 0 }}**Resolved**
|
|
||||||
{{ template "__teams_text_alert_list" .Alerts.Resolved }}{{ end }}{{ end }}
|
|
@ -1 +0,0 @@
|
|||||||
ingest_partition_01H3FQJ4S8FCJRDTW2KNVMPKBF
|
|
Binary file not shown.
Loading…
Reference in New Issue
Block a user