1use axum::{
4 Json,
5 extract::{
6 Path, Query,
7 ws::{Message, WebSocket, WebSocketUpgrade},
8 },
9 http::StatusCode,
10 response::IntoResponse,
11};
12use futures::StreamExt;
13use manta_backend_dispatcher::{
14 interfaces::console::{ConsoleAttachment, ConsoleTrait, TermSize},
15 types::{K8sAuth, K8sDetails},
16};
17use tokio::io::AsyncWriteExt;
18use tokio::sync::mpsc::Sender;
19
20use super::{
21 ErrorResponse, RequestCtx, SiteHeader, require_k8s_url, require_vault,
22 to_handler_error,
23};
24use crate::service;
25
26pub use manta_shared::types::api::queries::ConsoleQuery;
31
32#[utoipa::path(get, path = "/nodes/{xname}/console", tag = "console",
34 params(("xname" = String, Path, description = "Node xname"), ConsoleQuery, SiteHeader),
35 security(("bearerAuth" = [])),
36 responses(
37 (status = 101, description = "WebSocket upgrade"),
38 (status = 401, description = "Unauthorized", body = ErrorResponse),
39 (status = 500, description = "Internal error", body = ErrorResponse),
40 (status = 501, description = "Vault or k8s not configured", body = ErrorResponse),
41 )
42)]
43#[tracing::instrument(skip_all, fields(xname = %xname))]
44pub async fn console_node_ws(
45 ctx: RequestCtx,
46 Path(xname): Path<String>,
47 Query(q): Query<ConsoleQuery>,
48 ws: WebSocketUpgrade,
49) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
50 let (k8s_api_url, vault_base_url, timeout) = {
53 let infra = ctx.infra();
54 let k = require_k8s_url(infra.k8s_api_url)?.to_string();
55 let v = require_vault(infra.vault_base_url)?.to_string();
56 service::authorization::validate_user_group_members_access(
60 &infra,
61 &ctx.token,
62 std::slice::from_ref(&xname),
63 )
64 .await
65 .map_err(to_handler_error)?;
66 (k, v, ctx.state.console_inactivity_timeout)
67 };
68
69 let k8s = K8sDetails {
70 api_url: k8s_api_url,
71 authentication: K8sAuth::Vault {
72 base_url: vault_base_url,
73 },
74 };
75
76 let RequestCtx {
79 state,
80 token,
81 site_name,
82 } = ctx;
83
84 Ok(ws.on_upgrade(move |socket| async move {
85 tracing::info!("WebSocket console opened for node {xname}");
86 if let Some(site) = state.sites.get(&site_name) {
87 match site
88 .backend
89 .attach_to_node_console(
90 &token,
91 &site_name,
92 &xname,
93 TermSize {
94 width: q.cols,
95 height: q.rows,
96 },
97 &k8s,
98 )
99 .await
100 {
101 Ok(ConsoleAttachment {
102 stdin,
103 stdout,
104 resize,
105 }) => {
106 run_console_bridge(socket, stdin, stdout, resize, timeout).await;
107 tracing::info!("WebSocket console closed for node {xname}");
108 }
109 Err(e) => {
110 tracing::error!("Failed to attach to node console {xname}: {e:#}");
111 }
112 }
113 }
114 }))
115}
116
117#[utoipa::path(get, path = "/sessions/{name}/console", tag = "console",
123 params(("name" = String, Path, description = "Session name"), ConsoleQuery, SiteHeader),
124 security(("bearerAuth" = [])),
125 responses(
126 (status = 101, description = "WebSocket upgrade"),
127 (status = 401, description = "Unauthorized", body = ErrorResponse),
128 (status = 500, description = "Internal error", body = ErrorResponse),
129 (status = 501, description = "Vault or k8s not configured", body = ErrorResponse),
130 )
131)]
132#[tracing::instrument(skip_all, fields(session = %name))]
133pub async fn console_session_ws(
134 ctx: RequestCtx,
135 Path(name): Path<String>,
136 Query(q): Query<ConsoleQuery>,
137 ws: WebSocketUpgrade,
138) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
139 let (k8s_api_url, vault_base_url, timeout) = {
142 let infra = ctx.infra();
143 let k = require_k8s_url(infra.k8s_api_url)?.to_string();
144 let v = require_vault(infra.vault_base_url)?.to_string();
145 service::session::validate_session_access(&infra, &ctx.token, &name)
150 .await
151 .map_err(to_handler_error)?;
152 service::session::validate_console_session(&infra, &ctx.token, &name)
153 .await
154 .map_err(to_handler_error)?;
155 (k, v, ctx.state.console_inactivity_timeout)
156 };
157
158 let k8s = K8sDetails {
159 api_url: k8s_api_url,
160 authentication: K8sAuth::Vault {
161 base_url: vault_base_url,
162 },
163 };
164
165 let RequestCtx {
167 state,
168 token,
169 site_name,
170 } = ctx;
171
172 Ok(ws.on_upgrade(move |socket| async move {
173 tracing::info!("WebSocket console opened for session {name}");
174 if let Some(site) = state.sites.get(&site_name) {
175 match site
176 .backend
177 .attach_to_session_console(
178 &token,
179 &site_name,
180 &name,
181 TermSize {
182 width: q.cols,
183 height: q.rows,
184 },
185 &k8s,
186 )
187 .await
188 {
189 Ok(ConsoleAttachment {
190 stdin,
191 stdout,
192 resize,
193 }) => {
194 run_console_bridge(socket, stdin, stdout, resize, timeout).await;
195 tracing::info!("WebSocket console closed for session {name}");
196 }
197 Err(e) => {
198 tracing::error!("Failed to attach to session console {name}: {e:#}");
199 }
200 }
201 }
202 }))
203}
204
205#[allow(async_fn_in_trait)]
211trait ConsoleSocket: Send + Unpin {
212 async fn recv(&mut self) -> Option<Result<Message, axum::Error>>;
213 async fn send(&mut self, msg: Message) -> Result<(), axum::Error>;
214}
215
216impl ConsoleSocket for WebSocket {
217 async fn recv(&mut self) -> Option<Result<Message, axum::Error>> {
218 WebSocket::recv(self).await
219 }
220 async fn send(&mut self, msg: Message) -> Result<(), axum::Error> {
221 WebSocket::send(self, msg).await
222 }
223}
224
225async fn run_console_bridge<S: ConsoleSocket>(
237 mut socket: S,
238 mut console_in: Box<dyn tokio::io::AsyncWrite + Unpin + Send>,
239 console_out: Box<dyn tokio::io::AsyncRead + Unpin + Send>,
240 resize: Sender<TermSize>,
241 inactivity_timeout: std::time::Duration,
242) {
243 let mut out_stream = tokio_util::io::ReaderStream::new(console_out);
244 let mut deadline = tokio::time::Instant::now() + inactivity_timeout;
245
246 loop {
247 tokio::select! {
248 msg = socket.recv() => {
249 match msg {
250 Some(Ok(Message::Binary(data))) => {
251 deadline = tokio::time::Instant::now() + inactivity_timeout;
252 if console_in.write_all(&data).await.is_err() { break; }
253 }
254 Some(Ok(Message::Text(text))) => {
255 deadline = tokio::time::Instant::now() + inactivity_timeout;
256 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&text)
259 && v.get("type").and_then(|t| t.as_str()) == Some("resize")
260 {
261 let cols = v.get("cols").and_then(|c| c.as_u64()).unwrap_or(0);
262 let rows = v.get("rows").and_then(|r| r.as_u64()).unwrap_or(0);
263 if cols > 0 && rows > 0 && cols <= u64::from(u16::MAX) && rows <= u64::from(u16::MAX) {
264 #[allow(clippy::cast_possible_truncation)]
265 let size = TermSize {
266 width: cols as u16,
267 height: rows as u16,
268 };
269 let _ = resize.try_send(size);
273 }
274 continue;
275 }
276 if console_in.write_all(text.as_bytes()).await.is_err() { break; }
277 }
278 Some(Ok(Message::Close(_))) | None => break,
279 Some(Ok(_)) => {} Some(Err(_)) => break,
281 }
282 }
283 chunk = out_stream.next() => {
284 match chunk {
285 Some(Ok(data)) => {
286 if socket.send(Message::Binary(data)).await.is_err() { break; }
287 }
288 Some(Err(_)) | None => break,
289 }
290 }
291 () = tokio::time::sleep_until(deadline) => {
292 tracing::warn!("Console session idle for {:?}, closing", inactivity_timeout);
293 break;
294 }
295 }
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
311 use std::pin::Pin;
312 use std::sync::{Arc, Mutex};
313 use std::task::{Context, Poll};
314 use std::time::Duration;
315 use tokio::sync::mpsc;
316
317 struct CaptureWriter(Arc<Mutex<Vec<u8>>>);
320
321 impl tokio::io::AsyncWrite for CaptureWriter {
322 fn poll_write(
323 self: Pin<&mut Self>,
324 _: &mut Context<'_>,
325 buf: &[u8],
326 ) -> Poll<std::io::Result<usize>> {
327 self.0.lock().unwrap().extend_from_slice(buf);
328 Poll::Ready(Ok(buf.len()))
329 }
330 fn poll_flush(
331 self: Pin<&mut Self>,
332 _: &mut Context<'_>,
333 ) -> Poll<std::io::Result<()>> {
334 Poll::Ready(Ok(()))
335 }
336 fn poll_shutdown(
337 self: Pin<&mut Self>,
338 _: &mut Context<'_>,
339 ) -> Poll<std::io::Result<()>> {
340 Poll::Ready(Ok(()))
341 }
342 }
343
344 struct PendingReader;
349
350 impl tokio::io::AsyncRead for PendingReader {
351 fn poll_read(
352 self: Pin<&mut Self>,
353 _: &mut Context<'_>,
354 _: &mut tokio::io::ReadBuf<'_>,
355 ) -> Poll<std::io::Result<()>> {
356 Poll::Pending
357 }
358 }
359
360 struct MockSocket {
364 rx: mpsc::UnboundedReceiver<Result<Message, axum::Error>>,
365 tx: mpsc::UnboundedSender<Message>,
366 }
367
368 impl ConsoleSocket for MockSocket {
369 async fn recv(&mut self) -> Option<Result<Message, axum::Error>> {
370 self.rx.recv().await
371 }
372 async fn send(&mut self, msg: Message) -> Result<(), axum::Error> {
373 self.tx.send(msg).map_err(axum::Error::new)
374 }
375 }
376
377 fn new_mock_socket() -> (
378 MockSocket,
379 mpsc::UnboundedSender<Result<Message, axum::Error>>,
380 mpsc::UnboundedReceiver<Message>,
381 ) {
382 let (in_tx, in_rx) = mpsc::unbounded_channel();
383 let (out_tx, out_rx) = mpsc::unbounded_channel();
384 (
385 MockSocket {
386 rx: in_rx,
387 tx: out_tx,
388 },
389 in_tx,
390 out_rx,
391 )
392 }
393
394 async fn bridge_exited_within(
397 handle: &mut tokio::task::JoinHandle<()>,
398 cap: Duration,
399 ) -> bool {
400 tokio::select! {
401 _ = handle => true,
402 () = tokio::time::sleep(cap) => false,
403 }
404 }
405
406 #[tokio::test(start_paused = true)]
407 async fn inactivity_timeout_fires_when_no_traffic() {
408 let (socket, _in_tx, _out_rx) = new_mock_socket();
409 let console_in = Box::new(tokio::io::sink());
410 let console_out = Box::new(PendingReader);
411 let (resize_tx, _resize_rx) = mpsc::channel(8);
412
413 let mut handle = tokio::spawn(async move {
414 run_console_bridge(
415 socket,
416 console_in,
417 console_out,
418 resize_tx,
419 Duration::from_secs(60),
420 )
421 .await;
422 });
423
424 assert!(
426 !bridge_exited_within(&mut handle, Duration::from_secs(59)).await,
427 "bridge exited before the 60s inactivity timeout"
428 );
429 assert!(
431 bridge_exited_within(&mut handle, Duration::from_secs(5)).await,
432 "bridge did not exit after the inactivity timeout"
433 );
434 }
435
436 #[tokio::test(start_paused = true)]
437 async fn client_binary_message_resets_deadline() {
438 let (socket, in_tx, _out_rx) = new_mock_socket();
439 let console_in = Box::new(tokio::io::sink());
440 let console_out = Box::new(PendingReader);
441 let (resize_tx, _resize_rx) = mpsc::channel(8);
442
443 let mut handle = tokio::spawn(async move {
444 run_console_bridge(
445 socket,
446 console_in,
447 console_out,
448 resize_tx,
449 Duration::from_secs(60),
450 )
451 .await;
452 });
453
454 tokio::time::sleep(Duration::from_secs(59)).await;
456 in_tx
457 .send(Ok(Message::Binary(b"hi".to_vec().into())))
458 .unwrap();
459 tokio::task::yield_now().await;
462
463 assert!(
465 !bridge_exited_within(&mut handle, Duration::from_secs(31)).await,
466 "deadline was not reset by client binary message"
467 );
468 assert!(
470 bridge_exited_within(&mut handle, Duration::from_secs(35)).await,
471 "bridge did not exit after the reset deadline"
472 );
473 }
474
475 #[tokio::test(start_paused = true)]
476 async fn resize_text_forwards_to_resize_channel_and_resets_deadline() {
477 let (socket, in_tx, _out_rx) = new_mock_socket();
482 let written: Arc<Mutex<Vec<u8>>> = Default::default();
483 let console_in = Box::new(CaptureWriter(written.clone()));
484 let console_out = Box::new(PendingReader);
485 let (resize_tx, mut resize_rx) = mpsc::channel(8);
486
487 let mut handle = tokio::spawn(async move {
488 run_console_bridge(
489 socket,
490 console_in,
491 console_out,
492 resize_tx,
493 Duration::from_secs(60),
494 )
495 .await;
496 });
497
498 tokio::time::sleep(Duration::from_secs(59)).await;
499 in_tx
500 .send(Ok(Message::Text(
501 r#"{"type":"resize","cols":120,"rows":40}"#.into(),
502 )))
503 .unwrap();
504 tokio::task::yield_now().await;
505
506 assert!(
508 !bridge_exited_within(&mut handle, Duration::from_secs(30)).await,
509 "deadline was not reset by resize message"
510 );
511 assert!(
513 written.lock().unwrap().is_empty(),
514 "resize text frame was forwarded to console stdin (should be parsed)"
515 );
516 let size = resize_rx.try_recv().expect(
518 "resize message should have been forwarded to the resize channel",
519 );
520 assert_eq!(size.width, 120);
521 assert_eq!(size.height, 40);
522
523 handle.abort();
524 }
525
526 #[tokio::test(start_paused = true)]
527 async fn client_close_exits_loop_immediately() {
528 let (socket, in_tx, _out_rx) = new_mock_socket();
529 let console_in = Box::new(tokio::io::sink());
530 let console_out = Box::new(PendingReader);
531 let (resize_tx, _resize_rx) = mpsc::channel(8);
532
533 let mut handle = tokio::spawn(async move {
534 run_console_bridge(
535 socket,
536 console_in,
537 console_out,
538 resize_tx,
539 Duration::from_secs(3600),
540 )
541 .await;
542 });
543
544 in_tx.send(Ok(Message::Close(None))).unwrap();
545 assert!(
546 bridge_exited_within(&mut handle, Duration::from_secs(1)).await,
547 "bridge did not exit on Close frame"
548 );
549 }
550
551 #[tokio::test(start_paused = true)]
552 async fn server_to_client_data_does_not_reset_deadline() {
553 use tokio::io::AsyncReadExt;
559
560 let (socket, _in_tx, mut out_rx) = new_mock_socket();
561 let console_in = Box::new(tokio::io::sink());
562 let console_out =
567 Box::new(std::io::Cursor::new(b"chunk".to_vec()).chain(PendingReader));
568 let (resize_tx, _resize_rx) = mpsc::channel(8);
569
570 let mut handle = tokio::spawn(async move {
571 run_console_bridge(
572 socket,
573 console_in,
574 console_out,
575 resize_tx,
576 Duration::from_secs(60),
577 )
578 .await;
579 });
580
581 tokio::spawn(async move { while out_rx.recv().await.is_some() {} });
583
584 assert!(
588 bridge_exited_within(&mut handle, Duration::from_secs(65)).await,
589 "server-to-client data should NOT keep the deadline alive"
590 );
591 }
592}