manta_server/server/common/
kafka.rs

1//! Lazily-initialised Kafka producer used by the audit subsystem.
2//!
3//! The producer is a fire-and-forget `FutureProducer` cached behind
4//! a `OnceLock`, so the first audit message pays the connection cost
5//! and subsequent messages reuse the same client. Delivery uses a
6//! zero-duration wait — the audit path never blocks the request
7//! that triggered it.
8
9use std::{fmt, sync::OnceLock, time::Duration};
10
11use manta_shared::common::error::MantaError;
12use rdkafka::{
13  ClientConfig,
14  producer::{FutureProducer, FutureRecord},
15};
16use serde::{Deserialize, Serialize};
17
18use crate::server::common::audit::Audit;
19
20/// Default Kafka message delivery timeout (milliseconds), used when
21/// `server.toml`'s `[auditor.kafka].message_timeout_ms` is absent.
22const DEFAULT_KAFKA_MESSAGE_TIMEOUT_MS: u32 = 5000;
23
24/// Default Kafka delivery-confirmation wait (seconds), used when
25/// `server.toml`'s `[auditor.kafka].delivery_wait_secs` is absent.
26/// Zero means fire-and-forget.
27const DEFAULT_KAFKA_DELIVERY_WAIT_SECS: u64 = 0;
28
29fn default_kafka_message_timeout_ms() -> u32 {
30  DEFAULT_KAFKA_MESSAGE_TIMEOUT_MS
31}
32fn default_kafka_delivery_wait_secs() -> u64 {
33  DEFAULT_KAFKA_DELIVERY_WAIT_SECS
34}
35
36/// Kafka client configuration for audit message production.
37///
38/// The [`FutureProducer`] is lazily created on the first
39/// call to [`Audit::produce_message`] and reused for all
40/// subsequent calls via an internal [`OnceLock`].
41#[derive(Serialize, Deserialize)]
42pub struct Kafka {
43  /// Bootstrap broker list, e.g. `vec!["kafka.example.com:9092"]`.
44  pub brokers: Vec<String>,
45  /// Kafka topic that audit messages are published to.
46  pub topic: String,
47  /// librdkafka `message.timeout.ms`: how long a queued audit
48  /// message tries to deliver before being dropped.
49  #[serde(default = "default_kafka_message_timeout_ms")]
50  pub message_timeout_ms: u32,
51  /// How long `produce_message` blocks waiting for delivery
52  /// confirmation. Zero (default) is fire-and-forget.
53  #[serde(default = "default_kafka_delivery_wait_secs")]
54  pub delivery_wait_secs: u64,
55  #[serde(skip)]
56  producer: OnceLock<FutureProducer>,
57}
58
59impl fmt::Debug for Kafka {
60  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61    f.debug_struct("Kafka")
62      .field("brokers", &self.brokers)
63      .field("topic", &self.topic)
64      .field(
65        "producer",
66        &if self.producer.get().is_some() {
67          "Some(<FutureProducer>)"
68        } else {
69          "None"
70        },
71      )
72      .finish()
73  }
74}
75
76impl Clone for Kafka {
77  /// Clone the configuration only; the cached producer is
78  /// not cloned and will be lazily recreated.
79  fn clone(&self) -> Self {
80    Self {
81      brokers: self.brokers.clone(),
82      topic: self.topic.clone(),
83      message_timeout_ms: self.message_timeout_ms,
84      delivery_wait_secs: self.delivery_wait_secs,
85      producer: OnceLock::new(),
86    }
87  }
88}
89
90impl Kafka {
91  /// Create a new `Kafka` instance with the given broker
92  /// list and topic name.
93  ///
94  /// The actual `FutureProducer` is built lazily on the first
95  /// `produce_message` call, so this constructor is cheap and
96  /// infallible.
97  pub fn new(brokers: Vec<String>, topic: String) -> Self {
98    Self {
99      brokers,
100      topic,
101      message_timeout_ms: DEFAULT_KAFKA_MESSAGE_TIMEOUT_MS,
102      delivery_wait_secs: DEFAULT_KAFKA_DELIVERY_WAIT_SECS,
103      producer: OnceLock::new(),
104    }
105  }
106
107  /// Return the cached [`FutureProducer`], creating it on
108  /// first call.
109  fn get_or_init_producer(&self) -> Result<&FutureProducer, MantaError> {
110    if let Some(p) = self.producer.get() {
111      return Ok(p);
112    }
113    let brokers = self.brokers.join(",");
114    let p: FutureProducer = ClientConfig::new()
115      .set("bootstrap.servers", &brokers)
116      .set("message.timeout.ms", self.message_timeout_ms.to_string())
117      .create()
118      .map_err(|e| {
119        MantaError::KafkaError(format!("Failed to create Kafka producer: {e}"))
120      })?;
121    // Another thread may have raced us; either value is
122    // fine since they are configured identically.
123    Ok(self.producer.get_or_init(|| p))
124  }
125}
126
127impl Audit for Kafka {
128  async fn produce_message(&self, data: &[u8]) -> Result<(), MantaError> {
129    let producer = self.get_or_init_producer()?;
130
131    let delivery_status = producer
132      .send::<Vec<u8>, _, _>(
133        FutureRecord::to(&self.topic).payload(data),
134        Duration::from_secs(self.delivery_wait_secs),
135      )
136      .await;
137
138    match delivery_status {
139      Ok(_) => {
140        tracing::info!("Delivery status for message received");
141      }
142      Err(e) => {
143        return Err(MantaError::KafkaError(format!(
144          "Delivery status for message failed: {:?}",
145          e.0
146        )));
147      }
148    }
149
150    Ok(())
151  }
152}
153
154#[cfg(test)]
155mod tests {
156  //! Tests for the non-IO parts of [`Kafka`]: configuration plumbing,
157  //! clone semantics, and the redacted Debug representation.
158  //! `produce_message` and `get_or_init_producer` require a broker
159  //! (or a librdkafka mock) and are exercised via integration tests.
160
161  use super::*;
162
163  #[test]
164  fn new_round_trips_brokers_and_topic() {
165    let k = Kafka::new(
166      vec!["broker1:9092".into(), "broker2:9092".into()],
167      "audit-events".into(),
168    );
169    assert_eq!(k.brokers, vec!["broker1:9092", "broker2:9092"]);
170    assert_eq!(k.topic, "audit-events");
171    assert!(
172      k.producer.get().is_none(),
173      "producer must be uninitialised on construction (lazy init)"
174    );
175  }
176
177  #[test]
178  fn clone_resets_the_producer_cache() {
179    // The `Clone` impl deliberately drops the cached producer —
180    // otherwise two `Kafka` values would share rdkafka state in a
181    // way the rdkafka APIs don't sanction. A future "fix" that
182    // shares the OnceLock would break the lazy-init contract; this
183    // test makes that change deliberate.
184    let original = Kafka::new(vec!["b:9092".into()], "t".into());
185    let cloned = original.clone();
186    assert_eq!(cloned.brokers, original.brokers);
187    assert_eq!(cloned.topic, original.topic);
188    assert!(
189      cloned.producer.get().is_none(),
190      "cloned producer cache must be empty regardless of source state"
191    );
192  }
193
194  #[test]
195  fn debug_masks_the_producer_and_shows_init_state() {
196    // The Debug impl deliberately substitutes a placeholder string
197    // for the FutureProducer — librdkafka internals would otherwise
198    // appear in log lines if a Kafka value is debug-printed. Pin
199    // both the placeholder string and that brokers/topic remain
200    // visible (they're not secret).
201    let uninit = Kafka::new(vec!["b:9092".into()], "audit".into());
202    let s = format!("{uninit:?}");
203    assert!(s.contains("brokers"), "brokers field must be visible");
204    assert!(s.contains("\"b:9092\""), "broker value must be visible");
205    assert!(s.contains("audit"), "topic must be visible");
206    assert!(
207      s.contains("None"),
208      "uninitialised producer must show as `None`, got: {s}"
209    );
210    // The literal placeholder string used for an initialised producer
211    // is pinned indirectly: if `Some(<FutureProducer>)` ever leaks
212    // through to Debug output of an uninit Kafka, this assertion
213    // catches it.
214    assert!(
215      !s.contains("FutureProducer"),
216      "uninitialised Kafka must not mention FutureProducer in Debug output"
217    );
218  }
219}