Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions fact/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ pub struct BackoffConfig {
max: Option<Duration>,
jitter: Option<bool>,
multiplier: Option<f64>,
retries_max: Option<u64>,
}

impl BackoffConfig {
Expand All @@ -348,6 +349,9 @@ impl BackoffConfig {
if let Some(multiplier) = from.multiplier {
self.multiplier = Some(multiplier);
}
if let Some(retries) = from.retries_max {
self.retries_max = Some(retries);
}
}

pub fn initial(&self) -> Duration {
Expand All @@ -365,6 +369,10 @@ impl BackoffConfig {
pub fn multiplier(&self) -> f64 {
self.multiplier.unwrap_or(1.5)
}

pub fn retries(&self) -> u64 {
self.retries_max.unwrap_or(10)
}
}

impl TryFrom<&yaml::Hash> for BackoffConfig {
Expand Down Expand Up @@ -405,6 +413,12 @@ impl TryFrom<&yaml::Hash> for BackoffConfig {
};
backoff.multiplier = Some(multiplier);
}
"retries" => {
let Some(retries) = v.as_i64().filter(|r| *r >= 0) else {
bail!("invalid grpc.backoff.retries: {v:?}");
};
backoff.retries_max = Some(retries as u64);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
name => bail!("Invalid field 'grpc.backoff.{name}' with value: {v:?}"),
}
}
Expand Down Expand Up @@ -606,6 +620,13 @@ pub struct FactCli {
#[arg(long, env = "FACT_GRPC_BACKOFF_JITTER")]
backoff_jitter: Option<bool>,

/// Maximum number of times a gRPC connection will be attempted
/// before giving up
///
/// 0 means infinite retries
#[arg(long, env = "FACT_GRPC_BACKOFF_RETRIES_MAX")]
backoff_retries_max: Option<u64>,

/// The port to bind for all exposed endpoints
#[arg(long, short, env = "FACT_ENDPOINT_ADDRESS")]
address: Option<SocketAddr>,
Expand Down Expand Up @@ -701,6 +722,7 @@ impl FactCli {
max: self.backoff_max,
jitter: self.backoff_jitter,
multiplier: self.backoff_multiplier,
retries_max: self.backoff_retries_max,
},
},
endpoint: EndpointConfig {
Expand Down
110 changes: 110 additions & 0 deletions fact/src/config/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,23 @@ fn parsing() {
..Default::default()
},
),
(
r#"
grpc:
backoff:
retries: 5
"#,
FactConfig {
grpc: GrpcConfig {
backoff: BackoffConfig {
retries_max: Some(5),
..Default::default()
},
..Default::default()
},
..Default::default()
},
),
(
r#"
grpc:
Expand All @@ -348,6 +365,7 @@ fn parsing() {
max: 120
jitter: false
multiplier: 2
retries: 5
"#,
FactConfig {
grpc: GrpcConfig {
Expand All @@ -356,6 +374,7 @@ fn parsing() {
max: Some(Duration::from_secs(120)),
jitter: Some(false),
multiplier: Some(2.0),
retries_max: Some(5),
},
..Default::default()
},
Expand All @@ -374,6 +393,7 @@ fn parsing() {
max: 120
jitter: false
multiplier: 2
retries: 5
endpoint:
address: 0.0.0.0:8080
expose_metrics: true
Expand All @@ -396,6 +416,7 @@ fn parsing() {
max: Some(Duration::from_secs(120)),
jitter: Some(false),
multiplier: Some(2.0),
retries_max: Some(5),
},
},
endpoint: EndpointConfig {
Expand Down Expand Up @@ -543,6 +564,30 @@ paths:
"#,
"invalid grpc.backoff.multiplier: Real(\"0.5\")",
),
(
r#"
grpc:
backoff:
retries: 0.5
"#,
"invalid grpc.backoff.retries: Real(\"0.5\")",
),
(
r#"
grpc:
backoff:
retries: true
"#,
"invalid grpc.backoff.retries: Boolean(true)",
),
(
r#"
grpc:
backoff:
retries: -10
"#,
"invalid grpc.backoff.retries: Integer(-10)",
),
(
r#"
grpc:
Expand Down Expand Up @@ -1058,6 +1103,51 @@ fn update() {
..Default::default()
},
),
(
r#"
grpc:
backoff:
retries: 5
"#,
FactConfig::default(),
FactConfig {
grpc: GrpcConfig {
backoff: BackoffConfig {
retries_max: Some(5),
..Default::default()
},
..Default::default()
},
..Default::default()
},
),
(
r#"
grpc:
backoff:
retries: 5
"#,
FactConfig {
grpc: GrpcConfig {
backoff: BackoffConfig {
retries_max: Some(10),
..Default::default()
},
..Default::default()
},
..Default::default()
},
FactConfig {
grpc: GrpcConfig {
backoff: BackoffConfig {
retries_max: Some(5),
..Default::default()
},
..Default::default()
},
..Default::default()
},
),
(
r#"
endpoint:
Expand Down Expand Up @@ -1425,6 +1515,7 @@ fn update() {
max: 120
jitter: false
multiplier: 3.0
retries: 5
endpoint:
address: 127.0.0.1:8080
expose_metrics: true
Expand All @@ -1447,6 +1538,7 @@ fn update() {
max: Some(Duration::from_secs(30)),
jitter: Some(true),
multiplier: Some(2.0),
retries_max: Some(20),
},
},
endpoint: EndpointConfig {
Expand Down Expand Up @@ -1474,6 +1566,7 @@ fn update() {
max: Some(Duration::from_secs(120)),
jitter: Some(false),
multiplier: Some(3.0),
retries_max: Some(5),
},
},
endpoint: EndpointConfig {
Expand Down Expand Up @@ -1525,6 +1618,7 @@ fn defaults() {
assert_eq!(config.grpc.backoff.max(), Duration::from_secs(60));
assert!(config.grpc.backoff.jitter());
assert_eq!(config.grpc.backoff.multiplier(), 1.5);
assert_eq!(config.grpc.backoff.retries(), 10);
}

static ENV_MUTEX: Mutex<()> = Mutex::new(());
Expand Down Expand Up @@ -1756,6 +1850,22 @@ fn env_vars() {
..Default::default()
},
),
(
EnvVar {
name: "FACT_GRPC_BACKOFF_RETRIES_MAX",
value: "5",
},
FactConfig {
grpc: GrpcConfig {
backoff: BackoffConfig {
retries_max: Some(5),
..Default::default()
},
..Default::default()
},
..Default::default()
},
),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some tests with negative retries.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

(
EnvVar {
name: "FACT_ENDPOINT_ADDRESS",
Expand Down
63 changes: 31 additions & 32 deletions fact/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{borrow::BorrowMut, io::Write, str::FromStr};

use anyhow::Context;
use anyhow::{Context, bail};
use bpf::Bpf;
use host_info::{SystemInfo, get_distro, get_hostname};
use host_scanner::HostScanner;
Expand All @@ -10,6 +10,7 @@ use rate_limiter::RateLimiter;
use tokio::{
signal::unix::{SignalKind, signal},
sync::watch,
task::JoinError,
};

mod bpf;
Expand Down Expand Up @@ -64,6 +65,19 @@ pub fn log_system_information() {
info!("Hostname: {}", get_hostname());
}

fn flatten_task_result(
component: &str,
res: Result<anyhow::Result<()>, JoinError>,
) -> anyhow::Result<()> {
match res {
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => {
bail!("{component} worker errored out: {e:?}");
}
Err(e) => bail!("{component} task errored out: {e:?}"),
}
}

pub async fn run(config: FactConfig) -> anyhow::Result<()> {
// Log system information as early as possible so we have it
// available in case of a crash
Expand Down Expand Up @@ -99,13 +113,13 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> {
exporter.metrics.rate_limiter.clone(),
)?;

output::start(
let mut output_handle = output::start(
rx,
running.subscribe(),
exporter.metrics.output.clone(),
reloader.grpc(),
reloader.config().json(),
)?;
);
let mut host_scanner_handle = host_scanner.start();
let mut rate_limiter_handle = rate_limiter.start();
endpoints::Server::new(exporter.clone(), reloader.endpoint(), running.subscribe()).start();
Expand All @@ -114,43 +128,28 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> {

let mut sigterm = signal(SignalKind::terminate())?;
let mut sighup = signal(SignalKind::hangup())?;
loop {
let res = loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => break,
_ = sigterm.recv() => break,
_ = tokio::signal::ctrl_c() => break Ok(()),
_ = sigterm.recv() => break Ok(()),
_ = sighup.recv() => config_trigger.notify_one(),
res = bpf_handle.borrow_mut() => {
match res {
Ok(res) => if let Err(e) = res {
warn!("BPF worker errored out: {e:?}");
}
Err(e) => warn!("BPF task errored out: {e:?}"),
}
break;
task_res = bpf_handle.borrow_mut() => {
break flatten_task_result("BPF", task_res);
}
task_res = host_scanner_handle.borrow_mut() => {
break flatten_task_result("HostScanner", task_res);
}
res = host_scanner_handle.borrow_mut() => {
match res {
Ok(res) => if let Err(e) = res {
warn!("HostScanner worker errored out: {e:?}");
}
Err(e) => warn!("HostScanner task errored out: {e:?}"),
}
break;
task_res = rate_limiter_handle.borrow_mut() => {
break flatten_task_result("Rate limiter", task_res);
}
res = rate_limiter_handle.borrow_mut() => {
match res {
Ok(res) => if let Err(e) = res {
warn!("Rate limiter worker errored out: {e:?}");
}
Err(e) => warn!("Rate limiter task errored out: {e:?}"),
}
break;
task_res = output_handle.borrow_mut() => {
break flatten_task_result("Output", task_res);
}
}
}
};

running.send(false)?;
info!("Exiting...");

Ok(())
res
}
Loading
Loading