manta_server/server/handlers/
console.rs

1//! WebSocket console handlers (node + session).
2
3use axum::{
4  Json,
5  extract::{
6    Path, Query,
7    ws::{Message, WebSocket, WebSocketUpgrade},
8  },
9  http::StatusCode,
10  response::IntoResponse,
11};
12use futures::StreamExt;
13use manta_backend_dispatcher::{
14  interfaces::console::{ConsoleAttachment, ConsoleTrait, TermSize},
15  types::{K8sAuth, K8sDetails},
16};
17use tokio::io::AsyncWriteExt;
18use tokio::sync::mpsc::Sender;
19
20use super::{
21  ErrorResponse, RequestCtx, SiteHeader, require_k8s_url, require_vault,
22  to_handler_error,
23};
24use crate::service;
25
26// ---------------------------------------------------------------------------
27// WS /api/v1/nodes/{xname}/console — Interactive node console
28// ---------------------------------------------------------------------------
29
30pub use manta_shared::types::api::queries::ConsoleQuery;
31
32/// `WS /api/v1/nodes/{xname}/console` — attach an interactive PTY console to a node via WebSocket.
33#[utoipa::path(get, path = "/nodes/{xname}/console", tag = "console",
34  params(("xname" = String, Path, description = "Node xname"), ConsoleQuery, SiteHeader),
35  security(("bearerAuth" = [])),
36  responses(
37    (status = 101, description = "WebSocket upgrade"),
38    (status = 401, description = "Unauthorized",                   body = ErrorResponse),
39    (status = 500, description = "Internal error",                 body = ErrorResponse),
40    (status = 501, description = "Vault or k8s not configured",    body = ErrorResponse),
41  )
42)]
43#[tracing::instrument(skip_all, fields(xname = %xname))]
44pub async fn console_node_ws(
45  ctx: RequestCtx,
46  Path(xname): Path<String>,
47  Query(q): Query<ConsoleQuery>,
48  ws: WebSocketUpgrade,
49) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
50  // Read what we need from the borrowed infra and authorize the xname;
51  // the borrow ends with the block.
52  let (k8s_api_url, vault_base_url, timeout) = {
53    let infra = ctx.infra();
54    let k = require_k8s_url(infra.k8s_api_url)?.to_string();
55    let v = require_vault(infra.vault_base_url)?.to_string();
56    // Authorization: caller must have group access to this xname.
57    // Without this an authenticated user with any group could open
58    // an interactive PTY on any node in the cluster.
59    service::authorization::validate_user_group_members_access(
60      &infra,
61      &ctx.token,
62      std::slice::from_ref(&xname),
63    )
64    .await
65    .map_err(to_handler_error)?;
66    (k, v, ctx.state.console_inactivity_timeout)
67  };
68
69  let k8s = K8sDetails {
70    api_url: k8s_api_url,
71    authentication: K8sAuth::Vault {
72      base_url: vault_base_url,
73    },
74  };
75
76  // Move owned state into the spawned WebSocket task. Cannot use
77  // `ctx.infra()` inside the closure because it borrows from ctx.
78  let RequestCtx {
79    state,
80    token,
81    site_name,
82  } = ctx;
83
84  Ok(ws.on_upgrade(move |socket| async move {
85    tracing::info!("WebSocket console opened for node {xname}");
86    if let Some(site) = state.sites.get(&site_name) {
87      match site
88        .backend
89        .attach_to_node_console(
90          &token,
91          &site_name,
92          &xname,
93          TermSize {
94            width: q.cols,
95            height: q.rows,
96          },
97          &k8s,
98        )
99        .await
100      {
101        Ok(ConsoleAttachment {
102          stdin,
103          stdout,
104          resize,
105        }) => {
106          run_console_bridge(socket, stdin, stdout, resize, timeout).await;
107          tracing::info!("WebSocket console closed for node {xname}");
108        }
109        Err(e) => {
110          tracing::error!("Failed to attach to node console {xname}: {e:#}");
111        }
112      }
113    }
114  }))
115}
116
117// ---------------------------------------------------------------------------
118// WS /api/v1/sessions/{name}/console — Interactive CFS session console
119// ---------------------------------------------------------------------------
120
121/// `WS /api/v1/sessions/{name}/console` — attach an interactive PTY console to a CFS session pod via WebSocket.
122#[utoipa::path(get, path = "/sessions/{name}/console", tag = "console",
123  params(("name" = String, Path, description = "Session name"), ConsoleQuery, SiteHeader),
124  security(("bearerAuth" = [])),
125  responses(
126    (status = 101, description = "WebSocket upgrade"),
127    (status = 401, description = "Unauthorized",                   body = ErrorResponse),
128    (status = 500, description = "Internal error",                 body = ErrorResponse),
129    (status = 501, description = "Vault or k8s not configured",    body = ErrorResponse),
130  )
131)]
132#[tracing::instrument(skip_all, fields(session = %name))]
133pub async fn console_session_ws(
134  ctx: RequestCtx,
135  Path(name): Path<String>,
136  Query(q): Query<ConsoleQuery>,
137  ws: WebSocketUpgrade,
138) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
139  // Validate vault/k8s presence, then authorize the caller against
140  // the session's target groups, then check session liveness.
141  let (k8s_api_url, vault_base_url, timeout) = {
142    let infra = ctx.infra();
143    let k = require_k8s_url(infra.k8s_api_url)?.to_string();
144    let v = require_vault(infra.vault_base_url)?.to_string();
145    // Authorization: the caller's accessible groups must overlap the
146    // session's target.groups. validate_console_session does NOT do
147    // this check (only "is image-type and running"), so without this
148    // call any authenticated user could attach to any image session.
149    service::session::validate_session_access(&infra, &ctx.token, &name)
150      .await
151      .map_err(to_handler_error)?;
152    service::session::validate_console_session(&infra, &ctx.token, &name)
153      .await
154      .map_err(to_handler_error)?;
155    (k, v, ctx.state.console_inactivity_timeout)
156  };
157
158  let k8s = K8sDetails {
159    api_url: k8s_api_url,
160    authentication: K8sAuth::Vault {
161      base_url: vault_base_url,
162    },
163  };
164
165  // Move owned state into the spawned WebSocket task.
166  let RequestCtx {
167    state,
168    token,
169    site_name,
170  } = ctx;
171
172  Ok(ws.on_upgrade(move |socket| async move {
173    tracing::info!("WebSocket console opened for session {name}");
174    if let Some(site) = state.sites.get(&site_name) {
175      match site
176        .backend
177        .attach_to_session_console(
178          &token,
179          &site_name,
180          &name,
181          TermSize {
182            width: q.cols,
183            height: q.rows,
184          },
185          &k8s,
186        )
187        .await
188      {
189        Ok(ConsoleAttachment {
190          stdin,
191          stdout,
192          resize,
193        }) => {
194          run_console_bridge(socket, stdin, stdout, resize, timeout).await;
195          tracing::info!("WebSocket console closed for session {name}");
196        }
197        Err(e) => {
198          tracing::error!("Failed to attach to session console {name}: {e:#}");
199        }
200      }
201    }
202  }))
203}
204
205/// Minimal abstraction over the WebSocket half of the bridge so the
206/// timeout / message-handling loop can be unit-tested against an
207/// in-process mock channel. Cancel-safety follows from the underlying
208/// `WebSocket::recv` / `WebSocket::send` (both documented cancel-safe)
209/// and from `tokio::sync::mpsc` for the test impl.
210#[allow(async_fn_in_trait)]
211trait ConsoleSocket: Send + Unpin {
212  async fn recv(&mut self) -> Option<Result<Message, axum::Error>>;
213  async fn send(&mut self, msg: Message) -> Result<(), axum::Error>;
214}
215
216impl ConsoleSocket for WebSocket {
217  async fn recv(&mut self) -> Option<Result<Message, axum::Error>> {
218    WebSocket::recv(self).await
219  }
220  async fn send(&mut self, msg: Message) -> Result<(), axum::Error> {
221    WebSocket::send(self, msg).await
222  }
223}
224
225/// Bridge a WebSocket connection to a console's stdin/stdout streams.
226///
227/// - Binary and text WS frames are forwarded as raw bytes to console stdin.
228/// - Text frames matching `{"type":"resize","cols":N,"rows":N}` are parsed
229///   and forwarded to `resize`; the backend implementation pushes them
230///   onto the underlying transport's resize channel (k8s exec subprotocol
231///   channel 4). The bytes are not forwarded to stdin.
232/// - Console stdout is forwarded as Binary WS frames.
233/// - Either side closing or erroring terminates the bridge.
234/// - The bridge closes automatically after `inactivity_timeout` of silence
235///   from the client, releasing the Kubernetes pod attachment.
236async fn run_console_bridge<S: ConsoleSocket>(
237  mut socket: S,
238  mut console_in: Box<dyn tokio::io::AsyncWrite + Unpin + Send>,
239  console_out: Box<dyn tokio::io::AsyncRead + Unpin + Send>,
240  resize: Sender<TermSize>,
241  inactivity_timeout: std::time::Duration,
242) {
243  let mut out_stream = tokio_util::io::ReaderStream::new(console_out);
244  let mut deadline = tokio::time::Instant::now() + inactivity_timeout;
245
246  loop {
247    tokio::select! {
248      msg = socket.recv() => {
249        match msg {
250          Some(Ok(Message::Binary(data))) => {
251            deadline = tokio::time::Instant::now() + inactivity_timeout;
252            if console_in.write_all(&data).await.is_err() { break; }
253          }
254          Some(Ok(Message::Text(text))) => {
255            deadline = tokio::time::Instant::now() + inactivity_timeout;
256            // Resize control messages are parsed and forwarded to the
257            // backend's resize channel; everything else goes to stdin.
258            if let Ok(v) = serde_json::from_str::<serde_json::Value>(&text)
259              && v.get("type").and_then(|t| t.as_str()) == Some("resize")
260            {
261              let cols = v.get("cols").and_then(|c| c.as_u64()).unwrap_or(0);
262              let rows = v.get("rows").and_then(|r| r.as_u64()).unwrap_or(0);
263              if cols > 0 && rows > 0 && cols <= u64::from(u16::MAX) && rows <= u64::from(u16::MAX) {
264                #[allow(clippy::cast_possible_truncation)]
265                let size = TermSize {
266                  width: cols as u16,
267                  height: rows as u16,
268                };
269                // Drop the event on a full channel rather than blocking
270                // the bridge; resize is idempotent state, not a sequence
271                // of deltas.
272                let _ = resize.try_send(size);
273              }
274              continue;
275            }
276            if console_in.write_all(text.as_bytes()).await.is_err() { break; }
277          }
278          Some(Ok(Message::Close(_))) | None => break,
279          Some(Ok(_)) => {} // Ping/Pong handled by axum automatically
280          Some(Err(_)) => break,
281        }
282      }
283      chunk = out_stream.next() => {
284        match chunk {
285          Some(Ok(data)) => {
286            if socket.send(Message::Binary(data)).await.is_err() { break; }
287          }
288          Some(Err(_)) | None => break,
289        }
290      }
291      () = tokio::time::sleep_until(deadline) => {
292        tracing::warn!("Console session idle for {:?}, closing", inactivity_timeout);
293        break;
294      }
295    }
296  }
297}
298
299#[cfg(test)]
300mod tests {
301  //! Tests for [`run_console_bridge`] using an in-process mock socket
302  //! and tokio's paused-time scheduler.
303  //!
304  //! Each test follows the same pattern: spawn the bridge with a
305  //! `MockSocket` and a sink/empty pair for the console streams,
306  //! drive it with `tokio::time::advance` + occasional message sends
307  //! over the test-side handles, then assert on whether the bridge
308  //! handle has resolved (loop exited) or not.
309
310  use super::*;
311  use std::pin::Pin;
312  use std::sync::{Arc, Mutex};
313  use std::task::{Context, Poll};
314  use std::time::Duration;
315  use tokio::sync::mpsc;
316
317  /// Records every byte written through it so a test can assert what
318  /// (if anything) the bridge forwarded to console stdin.
319  struct CaptureWriter(Arc<Mutex<Vec<u8>>>);
320
321  impl tokio::io::AsyncWrite for CaptureWriter {
322    fn poll_write(
323      self: Pin<&mut Self>,
324      _: &mut Context<'_>,
325      buf: &[u8],
326    ) -> Poll<std::io::Result<usize>> {
327      self.0.lock().unwrap().extend_from_slice(buf);
328      Poll::Ready(Ok(buf.len()))
329    }
330    fn poll_flush(
331      self: Pin<&mut Self>,
332      _: &mut Context<'_>,
333    ) -> Poll<std::io::Result<()>> {
334      Poll::Ready(Ok(()))
335    }
336    fn poll_shutdown(
337      self: Pin<&mut Self>,
338      _: &mut Context<'_>,
339    ) -> Poll<std::io::Result<()>> {
340      Poll::Ready(Ok(()))
341    }
342  }
343
344  /// An AsyncRead that never yields. Use as `console_out` in tests
345  /// that don't want the server-side branch of the bridge to fire —
346  /// `tokio::io::empty()` is unsuitable here because it returns EOF on
347  /// the first read, which exits the bridge via the `None` arm.
348  struct PendingReader;
349
350  impl tokio::io::AsyncRead for PendingReader {
351    fn poll_read(
352      self: Pin<&mut Self>,
353      _: &mut Context<'_>,
354      _: &mut tokio::io::ReadBuf<'_>,
355    ) -> Poll<std::io::Result<()>> {
356      Poll::Pending
357    }
358  }
359
360  /// In-process stand-in for axum's `WebSocket` used only in tests.
361  /// `rx` is driven by the test (simulated client → server frames);
362  /// `tx` is observed by the test (simulated server → client frames).
363  struct MockSocket {
364    rx: mpsc::UnboundedReceiver<Result<Message, axum::Error>>,
365    tx: mpsc::UnboundedSender<Message>,
366  }
367
368  impl ConsoleSocket for MockSocket {
369    async fn recv(&mut self) -> Option<Result<Message, axum::Error>> {
370      self.rx.recv().await
371    }
372    async fn send(&mut self, msg: Message) -> Result<(), axum::Error> {
373      self.tx.send(msg).map_err(axum::Error::new)
374    }
375  }
376
377  fn new_mock_socket() -> (
378    MockSocket,
379    mpsc::UnboundedSender<Result<Message, axum::Error>>,
380    mpsc::UnboundedReceiver<Message>,
381  ) {
382    let (in_tx, in_rx) = mpsc::unbounded_channel();
383    let (out_tx, out_rx) = mpsc::unbounded_channel();
384    (
385      MockSocket {
386        rx: in_rx,
387        tx: out_tx,
388      },
389      in_tx,
390      out_rx,
391    )
392  }
393
394  /// Wait for either the bridge to finish or `cap` of paused time to
395  /// elapse. Returns `true` if the bridge exited within the budget.
396  async fn bridge_exited_within(
397    handle: &mut tokio::task::JoinHandle<()>,
398    cap: Duration,
399  ) -> bool {
400    tokio::select! {
401      _ = handle => true,
402      () = tokio::time::sleep(cap) => false,
403    }
404  }
405
406  #[tokio::test(start_paused = true)]
407  async fn inactivity_timeout_fires_when_no_traffic() {
408    let (socket, _in_tx, _out_rx) = new_mock_socket();
409    let console_in = Box::new(tokio::io::sink());
410    let console_out = Box::new(PendingReader);
411    let (resize_tx, _resize_rx) = mpsc::channel(8);
412
413    let mut handle = tokio::spawn(async move {
414      run_console_bridge(
415        socket,
416        console_in,
417        console_out,
418        resize_tx,
419        Duration::from_secs(60),
420      )
421      .await;
422    });
423
424    // Just before the deadline — bridge should still be alive.
425    assert!(
426      !bridge_exited_within(&mut handle, Duration::from_secs(59)).await,
427      "bridge exited before the 60s inactivity timeout"
428    );
429    // Cross the deadline — bridge should exit.
430    assert!(
431      bridge_exited_within(&mut handle, Duration::from_secs(5)).await,
432      "bridge did not exit after the inactivity timeout"
433    );
434  }
435
436  #[tokio::test(start_paused = true)]
437  async fn client_binary_message_resets_deadline() {
438    let (socket, in_tx, _out_rx) = new_mock_socket();
439    let console_in = Box::new(tokio::io::sink());
440    let console_out = Box::new(PendingReader);
441    let (resize_tx, _resize_rx) = mpsc::channel(8);
442
443    let mut handle = tokio::spawn(async move {
444      run_console_bridge(
445        socket,
446        console_in,
447        console_out,
448        resize_tx,
449        Duration::from_secs(60),
450      )
451      .await;
452    });
453
454    // At t≈59s send a binary frame — that resets the deadline to t+60.
455    tokio::time::sleep(Duration::from_secs(59)).await;
456    in_tx
457      .send(Ok(Message::Binary(b"hi".to_vec().into())))
458      .unwrap();
459    // Yield so the bridge actually processes the message before we
460    // advance time again.
461    tokio::task::yield_now().await;
462
463    // At original-t+90 (post-reset deadline is original-t+119) — still alive.
464    assert!(
465      !bridge_exited_within(&mut handle, Duration::from_secs(31)).await,
466      "deadline was not reset by client binary message"
467    );
468    // Now well past the reset deadline (~original-t+125) — should exit.
469    assert!(
470      bridge_exited_within(&mut handle, Duration::from_secs(35)).await,
471      "bridge did not exit after the reset deadline"
472    );
473  }
474
475  #[tokio::test(start_paused = true)]
476  async fn resize_text_forwards_to_resize_channel_and_resets_deadline() {
477    // The test exercises three guarantees in one shot:
478    //   - deadline resets on any client text frame (including resize)
479    //   - the resize JSON itself is parsed, never written to stdin
480    //   - parsed cols/rows are forwarded to the backend's resize channel
481    let (socket, in_tx, _out_rx) = new_mock_socket();
482    let written: Arc<Mutex<Vec<u8>>> = Default::default();
483    let console_in = Box::new(CaptureWriter(written.clone()));
484    let console_out = Box::new(PendingReader);
485    let (resize_tx, mut resize_rx) = mpsc::channel(8);
486
487    let mut handle = tokio::spawn(async move {
488      run_console_bridge(
489        socket,
490        console_in,
491        console_out,
492        resize_tx,
493        Duration::from_secs(60),
494      )
495      .await;
496    });
497
498    tokio::time::sleep(Duration::from_secs(59)).await;
499    in_tx
500      .send(Ok(Message::Text(
501        r#"{"type":"resize","cols":120,"rows":40}"#.into(),
502      )))
503      .unwrap();
504    tokio::task::yield_now().await;
505
506    // Bridge still alive past the original deadline.
507    assert!(
508      !bridge_exited_within(&mut handle, Duration::from_secs(30)).await,
509      "deadline was not reset by resize message"
510    );
511    // The resize JSON must not have been written to stdin.
512    assert!(
513      written.lock().unwrap().is_empty(),
514      "resize text frame was forwarded to console stdin (should be parsed)"
515    );
516    // The parsed size must have been forwarded to the resize channel.
517    let size = resize_rx.try_recv().expect(
518      "resize message should have been forwarded to the resize channel",
519    );
520    assert_eq!(size.width, 120);
521    assert_eq!(size.height, 40);
522
523    handle.abort();
524  }
525
526  #[tokio::test(start_paused = true)]
527  async fn client_close_exits_loop_immediately() {
528    let (socket, in_tx, _out_rx) = new_mock_socket();
529    let console_in = Box::new(tokio::io::sink());
530    let console_out = Box::new(PendingReader);
531    let (resize_tx, _resize_rx) = mpsc::channel(8);
532
533    let mut handle = tokio::spawn(async move {
534      run_console_bridge(
535        socket,
536        console_in,
537        console_out,
538        resize_tx,
539        Duration::from_secs(3600),
540      )
541      .await;
542    });
543
544    in_tx.send(Ok(Message::Close(None))).unwrap();
545    assert!(
546      bridge_exited_within(&mut handle, Duration::from_secs(1)).await,
547      "bridge did not exit on Close frame"
548    );
549  }
550
551  #[tokio::test(start_paused = true)]
552  async fn server_to_client_data_does_not_reset_deadline() {
553    // Pin the *current* behaviour: server-side data flowing through
554    // the bridge does NOT reset the inactivity deadline. The deadline
555    // tracks CLIENT inactivity (no keystrokes), so an idle user
556    // watching scrolling logs will still time out. If that policy
557    // changes, this test makes the decision explicit.
558    use tokio::io::AsyncReadExt;
559
560    let (socket, _in_tx, mut out_rx) = new_mock_socket();
561    let console_in = Box::new(tokio::io::sink());
562    // A finite chunk of server bytes followed by Pending forever, so
563    // the bridge forwards real data once but is never broken by EOF.
564    // If the deadline DID reset on server data, the bridge would
565    // stay alive past 60s; we assert the opposite below.
566    let console_out =
567      Box::new(std::io::Cursor::new(b"chunk".to_vec()).chain(PendingReader));
568    let (resize_tx, _resize_rx) = mpsc::channel(8);
569
570    let mut handle = tokio::spawn(async move {
571      run_console_bridge(
572        socket,
573        console_in,
574        console_out,
575        resize_tx,
576        Duration::from_secs(60),
577      )
578      .await;
579    });
580
581    // Drain whatever the bridge forwards so its `send` doesn't block.
582    tokio::spawn(async move { while out_rx.recv().await.is_some() {} });
583
584    // After ~65s of paused time with no CLIENT traffic, the deadline
585    // (set at t=0) should have fired even though server data was
586    // forwarded early on.
587    assert!(
588      bridge_exited_within(&mut handle, Duration::from_secs(65)).await,
589      "server-to-client data should NOT keep the deadline alive"
590    );
591  }
592}