manta_server/server/common/
kafka.rs1use std::{fmt, sync::OnceLock, time::Duration};
10
11use manta_shared::common::error::MantaError;
12use rdkafka::{
13 ClientConfig,
14 producer::{FutureProducer, FutureRecord},
15};
16use serde::{Deserialize, Serialize};
17
18use crate::server::common::audit::Audit;
19
20const DEFAULT_KAFKA_MESSAGE_TIMEOUT_MS: u32 = 5000;
23
24const DEFAULT_KAFKA_DELIVERY_WAIT_SECS: u64 = 0;
28
29fn default_kafka_message_timeout_ms() -> u32 {
30 DEFAULT_KAFKA_MESSAGE_TIMEOUT_MS
31}
32fn default_kafka_delivery_wait_secs() -> u64 {
33 DEFAULT_KAFKA_DELIVERY_WAIT_SECS
34}
35
36#[derive(Serialize, Deserialize)]
42pub struct Kafka {
43 pub brokers: Vec<String>,
45 pub topic: String,
47 #[serde(default = "default_kafka_message_timeout_ms")]
50 pub message_timeout_ms: u32,
51 #[serde(default = "default_kafka_delivery_wait_secs")]
54 pub delivery_wait_secs: u64,
55 #[serde(skip)]
56 producer: OnceLock<FutureProducer>,
57}
58
59impl fmt::Debug for Kafka {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 f.debug_struct("Kafka")
62 .field("brokers", &self.brokers)
63 .field("topic", &self.topic)
64 .field(
65 "producer",
66 &if self.producer.get().is_some() {
67 "Some(<FutureProducer>)"
68 } else {
69 "None"
70 },
71 )
72 .finish()
73 }
74}
75
76impl Clone for Kafka {
77 fn clone(&self) -> Self {
80 Self {
81 brokers: self.brokers.clone(),
82 topic: self.topic.clone(),
83 message_timeout_ms: self.message_timeout_ms,
84 delivery_wait_secs: self.delivery_wait_secs,
85 producer: OnceLock::new(),
86 }
87 }
88}
89
90impl Kafka {
91 pub fn new(brokers: Vec<String>, topic: String) -> Self {
98 Self {
99 brokers,
100 topic,
101 message_timeout_ms: DEFAULT_KAFKA_MESSAGE_TIMEOUT_MS,
102 delivery_wait_secs: DEFAULT_KAFKA_DELIVERY_WAIT_SECS,
103 producer: OnceLock::new(),
104 }
105 }
106
107 fn get_or_init_producer(&self) -> Result<&FutureProducer, MantaError> {
110 if let Some(p) = self.producer.get() {
111 return Ok(p);
112 }
113 let brokers = self.brokers.join(",");
114 let p: FutureProducer = ClientConfig::new()
115 .set("bootstrap.servers", &brokers)
116 .set("message.timeout.ms", self.message_timeout_ms.to_string())
117 .create()
118 .map_err(|e| {
119 MantaError::KafkaError(format!("Failed to create Kafka producer: {e}"))
120 })?;
121 Ok(self.producer.get_or_init(|| p))
124 }
125}
126
127impl Audit for Kafka {
128 async fn produce_message(&self, data: &[u8]) -> Result<(), MantaError> {
129 let producer = self.get_or_init_producer()?;
130
131 let delivery_status = producer
132 .send::<Vec<u8>, _, _>(
133 FutureRecord::to(&self.topic).payload(data),
134 Duration::from_secs(self.delivery_wait_secs),
135 )
136 .await;
137
138 match delivery_status {
139 Ok(_) => {
140 tracing::info!("Delivery status for message received");
141 }
142 Err(e) => {
143 return Err(MantaError::KafkaError(format!(
144 "Delivery status for message failed: {:?}",
145 e.0
146 )));
147 }
148 }
149
150 Ok(())
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
162
163 #[test]
164 fn new_round_trips_brokers_and_topic() {
165 let k = Kafka::new(
166 vec!["broker1:9092".into(), "broker2:9092".into()],
167 "audit-events".into(),
168 );
169 assert_eq!(k.brokers, vec!["broker1:9092", "broker2:9092"]);
170 assert_eq!(k.topic, "audit-events");
171 assert!(
172 k.producer.get().is_none(),
173 "producer must be uninitialised on construction (lazy init)"
174 );
175 }
176
177 #[test]
178 fn clone_resets_the_producer_cache() {
179 let original = Kafka::new(vec!["b:9092".into()], "t".into());
185 let cloned = original.clone();
186 assert_eq!(cloned.brokers, original.brokers);
187 assert_eq!(cloned.topic, original.topic);
188 assert!(
189 cloned.producer.get().is_none(),
190 "cloned producer cache must be empty regardless of source state"
191 );
192 }
193
194 #[test]
195 fn debug_masks_the_producer_and_shows_init_state() {
196 let uninit = Kafka::new(vec!["b:9092".into()], "audit".into());
202 let s = format!("{uninit:?}");
203 assert!(s.contains("brokers"), "brokers field must be visible");
204 assert!(s.contains("\"b:9092\""), "broker value must be visible");
205 assert!(s.contains("audit"), "topic must be visible");
206 assert!(
207 s.contains("None"),
208 "uninitialised producer must show as `None`, got: {s}"
209 );
210 assert!(
215 !s.contains("FutureProducer"),
216 "uninitialised Kafka must not mention FutureProducer in Debug output"
217 );
218 }
219}