manta_server/server/handlers/
session.rs

1//! Session handlers (get/create/delete + log streaming).
2
3use std::convert::Infallible;
4
5use axum::{
6  Json,
7  extract::{Path, Query},
8  http::StatusCode,
9  response::{
10    IntoResponse,
11    sse::{Event, KeepAlive, Sse},
12  },
13};
14use futures::{AsyncBufReadExt, StreamExt};
15use manta_backend_dispatcher::{
16  interfaces::cfs::CfsTrait,
17  types::{K8sAuth, K8sDetails},
18};
19use serde::Deserialize;
20use utoipa::{IntoParams, ToSchema};
21
22use super::{
23  ErrorResponse, RequestCtx, SiteHeader, require_k8s_url, require_vault,
24  serialize_or_500, to_handler_error, validate_repo_list_lengths,
25};
26use crate::service;
27
28// ---------------------------------------------------------------------------
29// GET /api/v1/sessions
30// ---------------------------------------------------------------------------
31
32/// Query parameters for `GET /sessions`.
33#[derive(Deserialize, IntoParams)]
34pub struct SessionQuery {
35  /// HSM group whose sessions should be returned.
36  pub hsm_group: Option<String>,
37  /// Filter to sessions whose `ansible_limit` mentions any of these
38  /// comma-separated xnames.
39  pub xnames: Option<String>,
40  /// Lower-bound session age expressed as a duration string
41  /// (e.g. `"1h"`, `"2d"`).
42  pub min_age: Option<String>,
43  /// Upper-bound session age expressed as a duration string.
44  pub max_age: Option<String>,
45  /// Session type filter: `"image"` or `"runtime"`.
46  pub session_type: Option<String>,
47  /// Status filter: `"pending"`, `"running"`, or `"complete"`.
48  pub status: Option<String>,
49  /// Exact session name.
50  pub name: Option<String>,
51  /// Cap on the number of sessions returned (most recent first).
52  pub limit: Option<u8>,
53}
54
55/// GET /sessions — list CFS sessions with optional filters.
56#[utoipa::path(get, path = "/sessions", tag = "sessions",
57  params(SessionQuery, SiteHeader),
58  security(("bearerAuth" = [])),
59  responses(
60    (status = 200, description = "List of sessions", body = serde_json::Value),
61    (status = 400, description = "Bad request",      body = ErrorResponse),
62    (status = 401, description = "Unauthorized",     body = ErrorResponse),
63    (status = 500, description = "Internal error",   body = ErrorResponse),
64  )
65)]
66#[tracing::instrument(skip_all)]
67pub async fn get_sessions(
68  ctx: RequestCtx,
69  Query(q): Query<SessionQuery>,
70) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
71  let infra = ctx.infra();
72
73  let xnames = match q.xnames {
74    Some(expr) => crate::server::common::node_ops::resolve_hosts_expression(
75      infra.backend,
76      &ctx.token,
77      &expr,
78      false,
79    )
80    .await
81    .map_err(to_handler_error)?,
82    None => vec![],
83  };
84
85  let params = service::session::GetSessionParams {
86    hsm_group: q.hsm_group,
87    xnames,
88    min_age: q.min_age,
89    max_age: q.max_age,
90    session_type: q.session_type,
91    status: q.status,
92    name: q.name,
93    limit: q.limit,
94  };
95
96  let sessions = service::session::get_sessions(&infra, &ctx.token, &params)
97    .await
98    .map_err(to_handler_error)?;
99
100  Ok(Json(sessions))
101}
102
103// ---------------------------------------------------------------------------
104// DELETE /api/v1/sessions/{name} — with ?dry_run=true support
105// ---------------------------------------------------------------------------
106
107/// Query parameters for `DELETE /sessions/{name}`.
108#[derive(Deserialize, IntoParams)]
109pub struct DeleteSessionQuery {
110  /// When true, return deletion context without actually deleting (default: false).
111  #[serde(default)]
112  pub dry_run: bool,
113}
114
115/// DELETE /sessions/{name} — cancel and delete a CFS session; `?dry_run=true` previews.
116#[utoipa::path(delete, path = "/sessions/{name}", tag = "sessions",
117  params(("name" = String, Path, description = "Session name"), DeleteSessionQuery, SiteHeader),
118  security(("bearerAuth" = [])),
119  responses(
120    (status = 200, description = "Session deleted or deletion preview", body = serde_json::Value),
121    (status = 401, description = "Unauthorized",                        body = ErrorResponse),
122    (status = 404, description = "Not found",                           body = ErrorResponse),
123    (status = 500, description = "Internal error",                      body = ErrorResponse),
124  )
125)]
126#[tracing::instrument(skip_all)]
127pub async fn delete_session(
128  ctx: RequestCtx,
129  Path(name): Path<String>,
130  Query(q): Query<DeleteSessionQuery>,
131) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
132  tracing::info!("delete_session name={} dry_run={}", name, q.dry_run);
133  let infra = ctx.infra();
134
135  let deletion_ctx =
136    service::session::prepare_session_deletion(&infra, &ctx.token, &name, None)
137      .await
138      .map_err(to_handler_error)?;
139
140  if q.dry_run {
141    return Ok((StatusCode::OK, Json(serialize_or_500(&deletion_ctx)?)));
142  }
143
144  service::session::execute_session_deletion(
145    &infra,
146    &ctx.token,
147    &deletion_ctx,
148    false,
149  )
150  .await
151  .map_err(to_handler_error)?;
152
153  Ok((StatusCode::OK, Json(serde_json::json!({ "deleted": name }))))
154}
155
156// ---------------------------------------------------------------------------
157// POST /api/v1/sessions — Create CFS session
158// ---------------------------------------------------------------------------
159
160/// Request body for `POST /sessions`.
161#[derive(Deserialize, ToSchema)]
162pub struct CreateSessionRequest {
163  /// Explicit name for the CFS session and configuration; auto-generated when absent.
164  pub cfs_conf_sess_name: Option<String>,
165  /// Ansible playbook filename inside the repository.
166  pub playbook_yaml_file_name: Option<String>,
167  /// Target HSM group name.
168  pub hsm_group: Option<String>,
169  /// Git repository names (parallel-indexed with `repo_last_commit_ids`).
170  pub repo_names: Vec<String>,
171  /// Git commit SHAs matching each entry in `repo_names`.
172  pub repo_last_commit_ids: Vec<String>,
173  /// Ansible `--limit` expression to restrict which hosts are targeted.
174  pub ansible_limit: Option<String>,
175  /// Ansible verbosity level (e.g. `"-v"`, `"-vvv"`).
176  pub ansible_verbosity: Option<String>,
177  /// Extra arguments forwarded verbatim to `ansible-playbook`.
178  pub ansible_passthrough: Option<String>,
179}
180
181/// `POST /api/v1/sessions` — create a CFS session from one or more git repositories.
182#[utoipa::path(post, path = "/sessions", tag = "sessions",
183  params(SiteHeader),
184  request_body = CreateSessionRequest,
185  security(("bearerAuth" = [])),
186  responses(
187    (status = 201, description = "Session created",               body = serde_json::Value),
188    (status = 400, description = "Bad request",                   body = ErrorResponse),
189    (status = 401, description = "Unauthorized",                  body = ErrorResponse),
190    (status = 500, description = "Internal error",                body = ErrorResponse),
191    (status = 501, description = "Vault not configured",          body = ErrorResponse),
192  )
193)]
194#[tracing::instrument(skip_all)]
195pub async fn create_session(
196  ctx: RequestCtx,
197  Json(body): Json<CreateSessionRequest>,
198) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
199  validate_repo_list_lengths(&body.repo_names, &body.repo_last_commit_ids)?;
200  tracing::info!("create_session repos={:?}", body.repo_names);
201  let infra = ctx.infra();
202
203  // Authorization: requested HSM group must be accessible to the token.
204  if let Some(ref hsm_group) = body.hsm_group {
205    service::group::validate_hsm_group_access(&infra, &ctx.token, hsm_group)
206      .await
207      .map_err(to_handler_error)?;
208  }
209  // Authorization: every xname in ansible_limit must belong to a group
210  // the token can access.
211  if let Some(ref ansible_limit) = body.ansible_limit {
212    let xnames: Vec<String> = ansible_limit
213      .split(',')
214      .map(|s| s.trim().to_string())
215      .collect();
216    crate::server::common::authorization::validate_target_hsm_members(
217      infra.backend,
218      &ctx.token,
219      &xnames,
220    )
221    .await
222    .map_err(to_handler_error)?;
223  }
224
225  let vault_base_url = require_vault(infra.vault_base_url)?;
226
227  let gitea_token =
228    crate::server::common::vault::http_client::fetch_shasta_vcs_token(
229      &ctx.token,
230      vault_base_url,
231      infra.site_name,
232    )
233    .await
234    .map_err(to_handler_error)?;
235
236  let repo_name_refs: Vec<&str> = body
237    .repo_names
238    .iter()
239    .map(std::string::String::as_str)
240    .collect();
241  let repo_commit_refs: Vec<&str> = body
242    .repo_last_commit_ids
243    .iter()
244    .map(std::string::String::as_str)
245    .collect();
246
247  let (session_name, config_name) = service::session::create_cfs_session(
248    &infra,
249    &ctx.token,
250    &gitea_token,
251    body.cfs_conf_sess_name.as_deref(),
252    body.playbook_yaml_file_name.as_deref(),
253    body.hsm_group.as_deref(),
254    &repo_name_refs,
255    &repo_commit_refs,
256    body.ansible_limit.as_deref(),
257    body.ansible_verbosity.as_deref(),
258    body.ansible_passthrough.as_deref(),
259  )
260  .await
261  .map_err(to_handler_error)?;
262
263  Ok((
264    StatusCode::CREATED,
265    Json(serde_json::json!({
266      "session_name": session_name,
267      "configuration_name": config_name,
268    })),
269  ))
270}
271
272// ---------------------------------------------------------------------------
273// GET /api/v1/sessions/{name}/logs — Stream CFS session logs via SSE
274// ---------------------------------------------------------------------------
275
276/// Query parameters for `GET /sessions/{name}/logs`.
277#[derive(Deserialize, IntoParams)]
278pub struct SessionLogsQuery {
279  /// When true, prefix each log line with its timestamp.
280  #[serde(default)]
281  pub timestamps: bool,
282}
283
284/// `GET /api/v1/sessions/{name}/logs` — stream CFS session pod logs via Server-Sent Events.
285#[utoipa::path(get, path = "/sessions/{name}/logs", tag = "sessions",
286  params(("name" = String, Path, description = "Session name"), SessionLogsQuery, SiteHeader),
287  security(("bearerAuth" = [])),
288  responses(
289    (status = 200, description = "SSE log stream"),
290    (status = 401, description = "Unauthorized",                   body = ErrorResponse),
291    (status = 500, description = "Internal error",                 body = ErrorResponse),
292    (status = 501, description = "Vault or k8s not configured",    body = ErrorResponse),
293  )
294)]
295#[tracing::instrument(skip_all)]
296pub async fn get_session_logs(
297  ctx: RequestCtx,
298  Path(name): Path<String>,
299  Query(q): Query<SessionLogsQuery>,
300) -> Result<
301  Sse<impl futures::Stream<Item = Result<Event, Infallible>>>,
302  (StatusCode, Json<ErrorResponse>),
303> {
304  let infra = ctx.infra();
305
306  let k8s_api_url = require_k8s_url(infra.k8s_api_url)?;
307  let vault_base_url = require_vault(infra.vault_base_url)?;
308
309  let k8s = K8sDetails {
310    api_url: k8s_api_url.to_string(),
311    authentication: K8sAuth::Vault {
312      base_url: vault_base_url.to_string(),
313    },
314  };
315
316  let logs_stream = infra
317    .backend
318    .get_session_logs_stream(
319      &ctx.token,
320      infra.site_name,
321      &name,
322      q.timestamps,
323      &k8s,
324    )
325    .await
326    .map_err(to_handler_error)?;
327
328  let sse_stream = logs_stream.lines().map(|result| {
329    Ok::<Event, Infallible>(
330      Event::default().data(result.unwrap_or_else(|e| format!("error: {e}"))),
331    )
332  });
333
334  Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
335}