manta_server/server/handlers/
session.rs

1//! Session handlers (get/create/delete + log streaming).
2
3use std::convert::Infallible;
4
5use axum::{
6  Json,
7  extract::{Path, Query},
8  http::StatusCode,
9  response::{
10    IntoResponse,
11    sse::{Event, KeepAlive, Sse},
12  },
13};
14use futures::{AsyncBufReadExt, StreamExt};
15use manta_backend_dispatcher::interfaces::cfs::CfsTrait;
16use manta_backend_dispatcher::types::{K8sAuth, K8sDetails};
17
18use super::{
19  ErrorResponse, RequestCtx, SiteHeader, require_k8s_url, require_vault,
20  serialize_or_500, to_handler_error, validate_repo_list_lengths,
21};
22use crate::service;
23
24// ---------------------------------------------------------------------------
25// GET /api/v1/sessions
26// ---------------------------------------------------------------------------
27
28pub use manta_shared::types::api::queries::{DeleteSessionQuery, SessionQuery};
29
30/// GET /sessions — list CFS sessions with optional filters.
31#[utoipa::path(get, path = "/sessions", tag = "sessions",
32  params(SessionQuery, SiteHeader),
33  security(("bearerAuth" = [])),
34  responses(
35    // CfsSessionGetResponse lives in manta-backend-dispatcher (third-party,
36    // no ToSchema) — kept as Value until upstream derives it.
37    (status = 200, description = "List of sessions", body = serde_json::Value),
38    (status = 400, description = "Bad request",      body = ErrorResponse),
39    (status = 401, description = "Unauthorized",     body = ErrorResponse),
40    (status = 500, description = "Internal error",   body = ErrorResponse),
41  )
42)]
43#[tracing::instrument(skip_all)]
44pub async fn get_sessions(
45  ctx: RequestCtx,
46  Query(q): Query<SessionQuery>,
47) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
48  let infra = ctx.infra();
49
50  let xnames: Vec<String> = q
51    .xnames
52    .map(|s| {
53      s.split(',')
54        .map(str::trim)
55        .filter(|v| !v.is_empty())
56        .map(str::to_string)
57        .collect()
58    })
59    .unwrap_or_default();
60
61  let params = service::session::GetSessionParams {
62    group: q.hsm_group,
63    xnames,
64    min_age: q.min_age,
65    max_age: q.max_age,
66    session_type: q.session_type,
67    status: q.status,
68    name: q.name,
69    limit: q.limit,
70  };
71
72  let sessions = service::session::get_sessions(&infra, &ctx.token, &params)
73    .await
74    .map_err(to_handler_error)?;
75
76  Ok(Json(sessions))
77}
78
79// ---------------------------------------------------------------------------
80// DELETE /api/v1/sessions/{name} — with ?dry_run=true support
81// ---------------------------------------------------------------------------
82
83/// DELETE /sessions/{name} — cancel and delete a CFS session; `?dry_run=true` previews.
84#[utoipa::path(delete, path = "/sessions/{name}", tag = "sessions",
85  params(("name" = String, Path, description = "Session name"), DeleteSessionQuery, SiteHeader),
86  security(("bearerAuth" = [])),
87  responses(
88    // dry_run/real result union — kept as Value until the union shape is formalised
89    (status = 200, description = "Session deleted or deletion preview", body = serde_json::Value),
90    (status = 401, description = "Unauthorized",                        body = ErrorResponse),
91    (status = 404, description = "Not found",                           body = ErrorResponse),
92    (status = 500, description = "Internal error",                      body = ErrorResponse),
93  )
94)]
95#[tracing::instrument(skip_all)]
96pub async fn delete_session(
97  ctx: RequestCtx,
98  Path(name): Path<String>,
99  Query(q): Query<DeleteSessionQuery>,
100) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
101  tracing::info!("delete_session name={} dry_run={}", name, q.dry_run);
102  let infra = ctx.infra();
103
104  let deletion_ctx =
105    service::session::prepare_session_deletion(&infra, &ctx.token, &name, None)
106      .await
107      .map_err(to_handler_error)?;
108
109  if q.dry_run {
110    return Ok((StatusCode::OK, Json(serialize_or_500(&deletion_ctx)?)));
111  }
112
113  service::session::execute_session_deletion(
114    &infra,
115    &ctx.token,
116    &deletion_ctx,
117    false,
118  )
119  .await
120  .map_err(to_handler_error)?;
121
122  Ok((StatusCode::OK, Json(serde_json::json!({ "deleted": name }))))
123}
124
125// ---------------------------------------------------------------------------
126// POST /api/v1/sessions — Create CFS session
127// ---------------------------------------------------------------------------
128
129pub use manta_shared::types::api::session::CreateSessionRequest;
130
131/// `POST /api/v1/sessions` — create a CFS session from one or more git repositories.
132#[utoipa::path(post, path = "/sessions", tag = "sessions",
133  params(SiteHeader),
134  request_body = CreateSessionRequest,
135  security(("bearerAuth" = [])),
136  responses(
137    (status = 201, description = "Session created",               body = manta_shared::types::api::responses::CreateSessionResponse),
138    (status = 400, description = "Bad request",                   body = ErrorResponse),
139    (status = 401, description = "Unauthorized",                  body = ErrorResponse),
140    (status = 500, description = "Internal error",                body = ErrorResponse),
141    (status = 501, description = "Vault not configured",          body = ErrorResponse),
142  )
143)]
144#[tracing::instrument(skip_all)]
145pub async fn create_session(
146  ctx: RequestCtx,
147  Json(body): Json<CreateSessionRequest>,
148) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
149  validate_repo_list_lengths(&body.repo_names, &body.repo_last_commit_ids)?;
150  tracing::info!("create_session repos={:?}", body.repo_names);
151  let infra = ctx.infra();
152
153  // Authorization: requested HSM group must be accessible to the token.
154  if let Some(ref hsm_group) = body.hsm_group {
155    service::authorization::validate_user_group_access(
156      &infra, &ctx.token, hsm_group,
157    )
158    .await
159    .map_err(to_handler_error)?;
160  }
161
162  // Authorization: every xname in ansible_limit must belong to a group
163  // the token can access.
164  if let Some(ref ansible_limit) = body.ansible_limit {
165    service::authorization::validate_ansible_limit_membership_access(
166      &infra,
167      &ctx.token,
168      ansible_limit,
169    )
170    .await
171    .map_err(to_handler_error)?;
172  }
173
174  let vault_base_url = require_vault(infra.vault_base_url)?;
175
176  let gitea_token =
177    crate::server::common::vault::http_client::get_shasta_vcs_token(
178      &ctx.token,
179      vault_base_url,
180      infra.site_name,
181    )
182    .await
183    .map_err(to_handler_error)?;
184
185  let repo_name_refs: Vec<&str> = body
186    .repo_names
187    .iter()
188    .map(std::string::String::as_str)
189    .collect();
190  let repo_commit_refs: Vec<&str> = body
191    .repo_last_commit_ids
192    .iter()
193    .map(std::string::String::as_str)
194    .collect();
195
196  let (session_name, config_name) = service::session::create_cfs_session(
197    &infra,
198    &ctx.token,
199    &gitea_token,
200    service::session::CreateCfsSessionParams {
201      cfs_conf_sess_name: body.cfs_conf_sess_name.as_deref(),
202      playbook_yaml_file_name: body.playbook_yaml_file_name.as_deref(),
203      group: body.hsm_group.as_deref(),
204      repo_names: &repo_name_refs,
205      repo_last_commit_ids: &repo_commit_refs,
206      ansible_limit: body.ansible_limit.as_deref(),
207      ansible_verbosity: body.ansible_verbosity.as_deref(),
208      ansible_passthrough: body.ansible_passthrough.as_deref(),
209    },
210  )
211  .await
212  .map_err(to_handler_error)?;
213
214  Ok((
215    StatusCode::CREATED,
216    Json(serde_json::json!({
217      "session_name": session_name,
218      "configuration_name": config_name,
219    })),
220  ))
221}
222
223// ---------------------------------------------------------------------------
224// GET /api/v1/sessions/{name}/logs — Stream CFS session logs via SSE
225// ---------------------------------------------------------------------------
226
227pub use manta_shared::types::api::queries::SessionLogsQuery;
228
229/// `GET /api/v1/sessions/{name}/logs` — stream CFS session pod logs via Server-Sent Events.
230#[utoipa::path(get, path = "/sessions/{name}/logs", tag = "sessions",
231  params(("name" = String, Path, description = "Session name"), SessionLogsQuery, SiteHeader),
232  security(("bearerAuth" = [])),
233  responses(
234    (status = 200, description = "SSE log stream"),
235    (status = 401, description = "Unauthorized",                   body = ErrorResponse),
236    (status = 500, description = "Internal error",                 body = ErrorResponse),
237    (status = 501, description = "Vault or k8s not configured",    body = ErrorResponse),
238  )
239)]
240#[tracing::instrument(skip_all)]
241pub async fn get_session_logs(
242  ctx: RequestCtx,
243  Path(name): Path<String>,
244  Query(q): Query<SessionLogsQuery>,
245) -> Result<
246  Sse<impl futures::Stream<Item = Result<Event, Infallible>>>,
247  (StatusCode, Json<ErrorResponse>),
248> {
249  let infra = ctx.infra();
250
251  let k8s_api_url = require_k8s_url(infra.k8s_api_url)?;
252  let vault_base_url = require_vault(infra.vault_base_url)?;
253
254  // Authorization: the caller's accessible groups must overlap the
255  // session's target.groups. Session logs frequently carry
256  // credentials, kernel-cmdline secrets, and ansible variable dumps;
257  // without this check any authenticated user could stream any
258  // session's logs.
259  service::session::validate_session_access(&infra, &ctx.token, &name)
260    .await
261    .map_err(to_handler_error)?;
262
263  let k8s = K8sDetails {
264    api_url: k8s_api_url.to_string(),
265    authentication: K8sAuth::Vault {
266      base_url: vault_base_url.to_string(),
267    },
268  };
269
270  let logs_stream = infra
271    .backend
272    .get_session_logs_stream(
273      &ctx.token,
274      infra.site_name,
275      &name,
276      q.timestamps,
277      &k8s,
278    )
279    .await
280    .map_err(to_handler_error)?;
281
282  let sse_stream = logs_stream.lines().map(|result| {
283    Ok::<Event, Infallible>(
284      Event::default().data(result.unwrap_or_else(|e| format!("error: {e}"))),
285    )
286  });
287
288  Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
289}