1use std::convert::Infallible;
4
5use axum::{
6 Json,
7 extract::{Path, Query},
8 http::StatusCode,
9 response::{
10 IntoResponse,
11 sse::{Event, KeepAlive, Sse},
12 },
13};
14use futures::{AsyncBufReadExt, StreamExt};
15use manta_backend_dispatcher::{
16 interfaces::cfs::CfsTrait,
17 types::{K8sAuth, K8sDetails},
18};
19use serde::Deserialize;
20use utoipa::{IntoParams, ToSchema};
21
22use super::{
23 ErrorResponse, RequestCtx, SiteHeader, require_k8s_url, require_vault,
24 serialize_or_500, to_handler_error, validate_repo_list_lengths,
25};
26use crate::service;
27
28#[derive(Deserialize, IntoParams)]
34pub struct SessionQuery {
35 pub hsm_group: Option<String>,
37 pub xnames: Option<String>,
40 pub min_age: Option<String>,
43 pub max_age: Option<String>,
45 pub session_type: Option<String>,
47 pub status: Option<String>,
49 pub name: Option<String>,
51 pub limit: Option<u8>,
53}
54
55#[utoipa::path(get, path = "/sessions", tag = "sessions",
57 params(SessionQuery, SiteHeader),
58 security(("bearerAuth" = [])),
59 responses(
60 (status = 200, description = "List of sessions", body = serde_json::Value),
61 (status = 400, description = "Bad request", body = ErrorResponse),
62 (status = 401, description = "Unauthorized", body = ErrorResponse),
63 (status = 500, description = "Internal error", body = ErrorResponse),
64 )
65)]
66#[tracing::instrument(skip_all)]
67pub async fn get_sessions(
68 ctx: RequestCtx,
69 Query(q): Query<SessionQuery>,
70) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
71 let infra = ctx.infra();
72
73 let xnames = match q.xnames {
74 Some(expr) => crate::server::common::node_ops::resolve_hosts_expression(
75 infra.backend,
76 &ctx.token,
77 &expr,
78 false,
79 )
80 .await
81 .map_err(to_handler_error)?,
82 None => vec![],
83 };
84
85 let params = service::session::GetSessionParams {
86 hsm_group: q.hsm_group,
87 xnames,
88 min_age: q.min_age,
89 max_age: q.max_age,
90 session_type: q.session_type,
91 status: q.status,
92 name: q.name,
93 limit: q.limit,
94 };
95
96 let sessions = service::session::get_sessions(&infra, &ctx.token, ¶ms)
97 .await
98 .map_err(to_handler_error)?;
99
100 Ok(Json(sessions))
101}
102
103#[derive(Deserialize, IntoParams)]
109pub struct DeleteSessionQuery {
110 #[serde(default)]
112 pub dry_run: bool,
113}
114
115#[utoipa::path(delete, path = "/sessions/{name}", tag = "sessions",
117 params(("name" = String, Path, description = "Session name"), DeleteSessionQuery, SiteHeader),
118 security(("bearerAuth" = [])),
119 responses(
120 (status = 200, description = "Session deleted or deletion preview", body = serde_json::Value),
121 (status = 401, description = "Unauthorized", body = ErrorResponse),
122 (status = 404, description = "Not found", body = ErrorResponse),
123 (status = 500, description = "Internal error", body = ErrorResponse),
124 )
125)]
126#[tracing::instrument(skip_all)]
127pub async fn delete_session(
128 ctx: RequestCtx,
129 Path(name): Path<String>,
130 Query(q): Query<DeleteSessionQuery>,
131) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
132 tracing::info!("delete_session name={} dry_run={}", name, q.dry_run);
133 let infra = ctx.infra();
134
135 let deletion_ctx =
136 service::session::prepare_session_deletion(&infra, &ctx.token, &name, None)
137 .await
138 .map_err(to_handler_error)?;
139
140 if q.dry_run {
141 return Ok((StatusCode::OK, Json(serialize_or_500(&deletion_ctx)?)));
142 }
143
144 service::session::execute_session_deletion(
145 &infra,
146 &ctx.token,
147 &deletion_ctx,
148 false,
149 )
150 .await
151 .map_err(to_handler_error)?;
152
153 Ok((StatusCode::OK, Json(serde_json::json!({ "deleted": name }))))
154}
155
156#[derive(Deserialize, ToSchema)]
162pub struct CreateSessionRequest {
163 pub cfs_conf_sess_name: Option<String>,
165 pub playbook_yaml_file_name: Option<String>,
167 pub hsm_group: Option<String>,
169 pub repo_names: Vec<String>,
171 pub repo_last_commit_ids: Vec<String>,
173 pub ansible_limit: Option<String>,
175 pub ansible_verbosity: Option<String>,
177 pub ansible_passthrough: Option<String>,
179}
180
181#[utoipa::path(post, path = "/sessions", tag = "sessions",
183 params(SiteHeader),
184 request_body = CreateSessionRequest,
185 security(("bearerAuth" = [])),
186 responses(
187 (status = 201, description = "Session created", body = serde_json::Value),
188 (status = 400, description = "Bad request", body = ErrorResponse),
189 (status = 401, description = "Unauthorized", body = ErrorResponse),
190 (status = 500, description = "Internal error", body = ErrorResponse),
191 (status = 501, description = "Vault not configured", body = ErrorResponse),
192 )
193)]
194#[tracing::instrument(skip_all)]
195pub async fn create_session(
196 ctx: RequestCtx,
197 Json(body): Json<CreateSessionRequest>,
198) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
199 validate_repo_list_lengths(&body.repo_names, &body.repo_last_commit_ids)?;
200 tracing::info!("create_session repos={:?}", body.repo_names);
201 let infra = ctx.infra();
202
203 if let Some(ref hsm_group) = body.hsm_group {
205 service::group::validate_hsm_group_access(&infra, &ctx.token, hsm_group)
206 .await
207 .map_err(to_handler_error)?;
208 }
209 if let Some(ref ansible_limit) = body.ansible_limit {
212 let xnames: Vec<String> = ansible_limit
213 .split(',')
214 .map(|s| s.trim().to_string())
215 .collect();
216 crate::server::common::authorization::validate_target_hsm_members(
217 infra.backend,
218 &ctx.token,
219 &xnames,
220 )
221 .await
222 .map_err(to_handler_error)?;
223 }
224
225 let vault_base_url = require_vault(infra.vault_base_url)?;
226
227 let gitea_token =
228 crate::server::common::vault::http_client::fetch_shasta_vcs_token(
229 &ctx.token,
230 vault_base_url,
231 infra.site_name,
232 )
233 .await
234 .map_err(to_handler_error)?;
235
236 let repo_name_refs: Vec<&str> = body
237 .repo_names
238 .iter()
239 .map(std::string::String::as_str)
240 .collect();
241 let repo_commit_refs: Vec<&str> = body
242 .repo_last_commit_ids
243 .iter()
244 .map(std::string::String::as_str)
245 .collect();
246
247 let (session_name, config_name) = service::session::create_cfs_session(
248 &infra,
249 &ctx.token,
250 &gitea_token,
251 body.cfs_conf_sess_name.as_deref(),
252 body.playbook_yaml_file_name.as_deref(),
253 body.hsm_group.as_deref(),
254 &repo_name_refs,
255 &repo_commit_refs,
256 body.ansible_limit.as_deref(),
257 body.ansible_verbosity.as_deref(),
258 body.ansible_passthrough.as_deref(),
259 )
260 .await
261 .map_err(to_handler_error)?;
262
263 Ok((
264 StatusCode::CREATED,
265 Json(serde_json::json!({
266 "session_name": session_name,
267 "configuration_name": config_name,
268 })),
269 ))
270}
271
272#[derive(Deserialize, IntoParams)]
278pub struct SessionLogsQuery {
279 #[serde(default)]
281 pub timestamps: bool,
282}
283
284#[utoipa::path(get, path = "/sessions/{name}/logs", tag = "sessions",
286 params(("name" = String, Path, description = "Session name"), SessionLogsQuery, SiteHeader),
287 security(("bearerAuth" = [])),
288 responses(
289 (status = 200, description = "SSE log stream"),
290 (status = 401, description = "Unauthorized", body = ErrorResponse),
291 (status = 500, description = "Internal error", body = ErrorResponse),
292 (status = 501, description = "Vault or k8s not configured", body = ErrorResponse),
293 )
294)]
295#[tracing::instrument(skip_all)]
296pub async fn get_session_logs(
297 ctx: RequestCtx,
298 Path(name): Path<String>,
299 Query(q): Query<SessionLogsQuery>,
300) -> Result<
301 Sse<impl futures::Stream<Item = Result<Event, Infallible>>>,
302 (StatusCode, Json<ErrorResponse>),
303> {
304 let infra = ctx.infra();
305
306 let k8s_api_url = require_k8s_url(infra.k8s_api_url)?;
307 let vault_base_url = require_vault(infra.vault_base_url)?;
308
309 let k8s = K8sDetails {
310 api_url: k8s_api_url.to_string(),
311 authentication: K8sAuth::Vault {
312 base_url: vault_base_url.to_string(),
313 },
314 };
315
316 let logs_stream = infra
317 .backend
318 .get_session_logs_stream(
319 &ctx.token,
320 infra.site_name,
321 &name,
322 q.timestamps,
323 &k8s,
324 )
325 .await
326 .map_err(to_handler_error)?;
327
328 let sse_stream = logs_stream.lines().map(|result| {
329 Ok::<Event, Infallible>(
330 Event::default().data(result.unwrap_or_else(|e| format!("error: {e}"))),
331 )
332 });
333
334 Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
335}