1use std::convert::Infallible;
4
5use axum::{
6 Json,
7 extract::{Path, Query},
8 http::StatusCode,
9 response::{
10 IntoResponse,
11 sse::{Event, KeepAlive, Sse},
12 },
13};
14use futures::{AsyncBufReadExt, StreamExt};
15use manta_backend_dispatcher::interfaces::cfs::CfsTrait;
16use manta_backend_dispatcher::types::{K8sAuth, K8sDetails};
17
18use super::{
19 ErrorResponse, RequestCtx, SiteHeader, require_k8s_url, require_vault,
20 serialize_or_500, to_handler_error, validate_repo_list_lengths,
21};
22use crate::service;
23
24pub use manta_shared::types::api::queries::{DeleteSessionQuery, SessionQuery};
29
30#[utoipa::path(get, path = "/sessions", tag = "sessions",
32 params(SessionQuery, SiteHeader),
33 security(("bearerAuth" = [])),
34 responses(
35 (status = 200, description = "List of sessions", body = serde_json::Value),
38 (status = 400, description = "Bad request", body = ErrorResponse),
39 (status = 401, description = "Unauthorized", body = ErrorResponse),
40 (status = 500, description = "Internal error", body = ErrorResponse),
41 )
42)]
43#[tracing::instrument(skip_all)]
44pub async fn get_sessions(
45 ctx: RequestCtx,
46 Query(q): Query<SessionQuery>,
47) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
48 let infra = ctx.infra();
49
50 let xnames: Vec<String> = q
51 .xnames
52 .map(|s| {
53 s.split(',')
54 .map(str::trim)
55 .filter(|v| !v.is_empty())
56 .map(str::to_string)
57 .collect()
58 })
59 .unwrap_or_default();
60
61 let params = service::session::GetSessionParams {
62 group: q.hsm_group,
63 xnames,
64 min_age: q.min_age,
65 max_age: q.max_age,
66 session_type: q.session_type,
67 status: q.status,
68 name: q.name,
69 limit: q.limit,
70 };
71
72 let sessions = service::session::get_sessions(&infra, &ctx.token, ¶ms)
73 .await
74 .map_err(to_handler_error)?;
75
76 Ok(Json(sessions))
77}
78
79#[utoipa::path(delete, path = "/sessions/{name}", tag = "sessions",
85 params(("name" = String, Path, description = "Session name"), DeleteSessionQuery, SiteHeader),
86 security(("bearerAuth" = [])),
87 responses(
88 (status = 200, description = "Session deleted or deletion preview", body = serde_json::Value),
90 (status = 401, description = "Unauthorized", body = ErrorResponse),
91 (status = 404, description = "Not found", body = ErrorResponse),
92 (status = 500, description = "Internal error", body = ErrorResponse),
93 )
94)]
95#[tracing::instrument(skip_all)]
96pub async fn delete_session(
97 ctx: RequestCtx,
98 Path(name): Path<String>,
99 Query(q): Query<DeleteSessionQuery>,
100) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
101 tracing::info!("delete_session name={} dry_run={}", name, q.dry_run);
102 let infra = ctx.infra();
103
104 let deletion_ctx =
105 service::session::prepare_session_deletion(&infra, &ctx.token, &name, None)
106 .await
107 .map_err(to_handler_error)?;
108
109 if q.dry_run {
110 return Ok((StatusCode::OK, Json(serialize_or_500(&deletion_ctx)?)));
111 }
112
113 service::session::execute_session_deletion(
114 &infra,
115 &ctx.token,
116 &deletion_ctx,
117 false,
118 )
119 .await
120 .map_err(to_handler_error)?;
121
122 Ok((StatusCode::OK, Json(serde_json::json!({ "deleted": name }))))
123}
124
125pub use manta_shared::types::api::session::CreateSessionRequest;
130
131#[utoipa::path(post, path = "/sessions", tag = "sessions",
133 params(SiteHeader),
134 request_body = CreateSessionRequest,
135 security(("bearerAuth" = [])),
136 responses(
137 (status = 201, description = "Session created", body = manta_shared::types::api::responses::CreateSessionResponse),
138 (status = 400, description = "Bad request", body = ErrorResponse),
139 (status = 401, description = "Unauthorized", body = ErrorResponse),
140 (status = 500, description = "Internal error", body = ErrorResponse),
141 (status = 501, description = "Vault not configured", body = ErrorResponse),
142 )
143)]
144#[tracing::instrument(skip_all)]
145pub async fn create_session(
146 ctx: RequestCtx,
147 Json(body): Json<CreateSessionRequest>,
148) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
149 validate_repo_list_lengths(&body.repo_names, &body.repo_last_commit_ids)?;
150 tracing::info!("create_session repos={:?}", body.repo_names);
151 let infra = ctx.infra();
152
153 if let Some(ref hsm_group) = body.hsm_group {
155 service::authorization::validate_user_group_access(
156 &infra, &ctx.token, hsm_group,
157 )
158 .await
159 .map_err(to_handler_error)?;
160 }
161
162 if let Some(ref ansible_limit) = body.ansible_limit {
165 service::authorization::validate_ansible_limit_membership_access(
166 &infra,
167 &ctx.token,
168 ansible_limit,
169 )
170 .await
171 .map_err(to_handler_error)?;
172 }
173
174 let vault_base_url = require_vault(infra.vault_base_url)?;
175
176 let gitea_token =
177 crate::server::common::vault::http_client::get_shasta_vcs_token(
178 &ctx.token,
179 vault_base_url,
180 infra.site_name,
181 )
182 .await
183 .map_err(to_handler_error)?;
184
185 let repo_name_refs: Vec<&str> = body
186 .repo_names
187 .iter()
188 .map(std::string::String::as_str)
189 .collect();
190 let repo_commit_refs: Vec<&str> = body
191 .repo_last_commit_ids
192 .iter()
193 .map(std::string::String::as_str)
194 .collect();
195
196 let (session_name, config_name) = service::session::create_cfs_session(
197 &infra,
198 &ctx.token,
199 &gitea_token,
200 service::session::CreateCfsSessionParams {
201 cfs_conf_sess_name: body.cfs_conf_sess_name.as_deref(),
202 playbook_yaml_file_name: body.playbook_yaml_file_name.as_deref(),
203 group: body.hsm_group.as_deref(),
204 repo_names: &repo_name_refs,
205 repo_last_commit_ids: &repo_commit_refs,
206 ansible_limit: body.ansible_limit.as_deref(),
207 ansible_verbosity: body.ansible_verbosity.as_deref(),
208 ansible_passthrough: body.ansible_passthrough.as_deref(),
209 },
210 )
211 .await
212 .map_err(to_handler_error)?;
213
214 Ok((
215 StatusCode::CREATED,
216 Json(serde_json::json!({
217 "session_name": session_name,
218 "configuration_name": config_name,
219 })),
220 ))
221}
222
223pub use manta_shared::types::api::queries::SessionLogsQuery;
228
229#[utoipa::path(get, path = "/sessions/{name}/logs", tag = "sessions",
231 params(("name" = String, Path, description = "Session name"), SessionLogsQuery, SiteHeader),
232 security(("bearerAuth" = [])),
233 responses(
234 (status = 200, description = "SSE log stream"),
235 (status = 401, description = "Unauthorized", body = ErrorResponse),
236 (status = 500, description = "Internal error", body = ErrorResponse),
237 (status = 501, description = "Vault or k8s not configured", body = ErrorResponse),
238 )
239)]
240#[tracing::instrument(skip_all)]
241pub async fn get_session_logs(
242 ctx: RequestCtx,
243 Path(name): Path<String>,
244 Query(q): Query<SessionLogsQuery>,
245) -> Result<
246 Sse<impl futures::Stream<Item = Result<Event, Infallible>>>,
247 (StatusCode, Json<ErrorResponse>),
248> {
249 let infra = ctx.infra();
250
251 let k8s_api_url = require_k8s_url(infra.k8s_api_url)?;
252 let vault_base_url = require_vault(infra.vault_base_url)?;
253
254 service::session::validate_session_access(&infra, &ctx.token, &name)
260 .await
261 .map_err(to_handler_error)?;
262
263 let k8s = K8sDetails {
264 api_url: k8s_api_url.to_string(),
265 authentication: K8sAuth::Vault {
266 base_url: vault_base_url.to_string(),
267 },
268 };
269
270 let logs_stream = infra
271 .backend
272 .get_session_logs_stream(
273 &ctx.token,
274 infra.site_name,
275 &name,
276 q.timestamps,
277 &k8s,
278 )
279 .await
280 .map_err(to_handler_error)?;
281
282 let sse_stream = logs_stream.lines().map(|result| {
283 Ok::<Event, Infallible>(
284 Event::default().data(result.unwrap_or_else(|e| format!("error: {e}"))),
285 )
286 });
287
288 Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
289}