diff --git a/Cargo.lock b/Cargo.lock index 038190c..16c0081 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3548,6 +3548,7 @@ dependencies = [ "actix-multipart", "actix-web", "chrono", + "constant_time_eq", "convert_case 0.6.0", "dotenvy", "flate2", diff --git a/registry/.env.example b/registry/.env.example index dba0403..b17c136 100644 --- a/registry/.env.example +++ b/registry/.env.example @@ -6,10 +6,32 @@ GITHUB_PAT= # personal access token of github account with push access COMMITTER_GIT_NAME= # name of the committer used for index updates COMMITTER_GIT_EMAIL= # email of the committer used for index updates -S3_ENDPOINT= # endpoint of the s3 bucket -S3_BUCKET_NAME= # name of the s3 bucket -S3_REGION= # region of the s3 bucket -S3_ACCESS_KEY= # access key of the s3 bucket -S3_SECRET_KEY= # secret key of the s3 bucket +# AUTHENTICATION CONFIGURATION +# Set the variables of the authentication you want to use in order to enable it + +# Single Token +ACCESS_TOKEN= # a single token that is used to authenticate all publish requests + +# Read/Write Tokens +READ_ACCESS_TOKEN= # a token that is used to authenticate read requests +WRITE_ACCESS_TOKEN= # a token that is used to authenticate write requests + +# GitHub +GITHUB_AUTH= # set to any value to enable GitHub authentication + +# If none of the above is set, no authentication is required, even for write requests + +# STORAGE CONFIGURATION +# Set the variables of the storage you want to use in order to enable it + +# S3 +S3_ENDPOINT= # endpoint of the S3 bucket +S3_BUCKET_NAME= # name of the S3 bucket +S3_REGION= # region of the S3 bucket +S3_ACCESS_KEY= # access key of the S3 bucket +S3_SECRET_KEY= # secret key of the S3 bucket + +# FS +FS_STORAGE_ROOT= # root directory of the filesystem storage SENTRY_URL= # optional url of sentry error tracking \ No newline at end of file diff --git a/registry/Cargo.toml b/registry/Cargo.toml index dbd95a8..bd52a5f 100644 --- a/registry/Cargo.toml +++ b/registry/Cargo.toml @@ -34,6 +34,7 @@ sha2 = "0.10.8" rusty-s3 = "0.5.0" reqwest = { version = "0.12.7", features = ["json", "rustls-tls"] } +constant_time_eq = "0.3.1" tar = "0.4.41" flate2 = "1.0.33" diff --git a/registry/src/auth.rs b/registry/src/auth.rs deleted file mode 100644 index b97fad3..0000000 --- a/registry/src/auth.rs +++ /dev/null @@ -1,95 +0,0 @@ -use crate::AppState; -use actix_governor::{KeyExtractor, SimpleKeyExtractionError}; -use actix_web::{ - body::MessageBody, - dev::{ServiceRequest, ServiceResponse}, - error::Error as ActixError, - http::header::AUTHORIZATION, - middleware::Next, - web, HttpMessage, HttpResponse, -}; -use serde::Deserialize; - -#[derive(Debug, Copy, Clone, Hash, PartialOrd, PartialEq, Eq, Ord)] -pub struct UserId(pub u64); - -#[derive(Debug, Deserialize)] -struct UserResponse { - id: u64, -} - -pub async fn authentication( - app_state: web::Data, - req: ServiceRequest, - next: Next, -) -> Result, ActixError> { - let token = match req - .headers() - .get(AUTHORIZATION) - .map(|token| token.to_str().unwrap()) - { - Some(token) => token, - None => { - return Ok(req - .into_response(HttpResponse::Unauthorized().finish()) - .map_into_right_body()) - } - }; - - let token = if token.to_lowercase().starts_with("bearer ") { - token.to_string() - } else { - format!("Bearer {token}") - }; - - let response = match app_state - .reqwest_client - .get("https://api.github.com/user") - .header(reqwest::header::AUTHORIZATION, token) - .send() - .await - .and_then(|res| res.error_for_status()) - { - Ok(response) => response, - Err(e) if e.status() == Some(reqwest::StatusCode::UNAUTHORIZED) => { - return Ok(req - .into_response(HttpResponse::Unauthorized().finish()) - .map_into_right_body()) - } - Err(e) => { - log::error!("failed to get user: {e}"); - return Ok(req - .into_response(HttpResponse::InternalServerError().finish()) - .map_into_right_body()); - } - }; - - let user_id = match response.json::().await { - Ok(user) => user.id, - Err(_) => { - return Ok(req - .into_response(HttpResponse::Unauthorized().finish()) - .map_into_right_body()) - } - }; - - req.extensions_mut().insert(UserId(user_id)); - - let res = next.call(req).await?; - Ok(res.map_into_left_body()) -} - -#[derive(Debug, Clone)] -pub struct UserIdExtractor; - -impl KeyExtractor for UserIdExtractor { - type Key = UserId; - type KeyExtractionError = SimpleKeyExtractionError<&'static str>; - - fn extract(&self, req: &ServiceRequest) -> Result { - match req.extensions().get::() { - Some(user_id) => Ok(*user_id), - None => Err(SimpleKeyExtractionError::new("UserId not found")), - } - } -} diff --git a/registry/src/auth/github.rs b/registry/src/auth/github.rs new file mode 100644 index 0000000..aa39d22 --- /dev/null +++ b/registry/src/auth/github.rs @@ -0,0 +1,54 @@ +use crate::auth::{get_token_from_req, AuthImpl, UserId}; +use actix_web::{dev::ServiceRequest, Error as ActixError}; +use serde::Deserialize; +use std::fmt::Display; + +#[derive(Debug)] +pub struct GitHubAuth { + pub reqwest_client: reqwest::Client, +} + +impl AuthImpl for GitHubAuth { + async fn for_write_request(&self, req: &ServiceRequest) -> Result, ActixError> { + let token = match get_token_from_req(req, true) { + Some(token) => token, + None => return Ok(None), + }; + + let response = match self + .reqwest_client + .get("https://api.github.com/user") + .header(reqwest::header::AUTHORIZATION, token) + .send() + .await + .and_then(|res| res.error_for_status()) + { + Ok(response) => response, + Err(e) => { + log::error!("failed to get user: {e}"); + return Ok(None); + } + }; + + let user_id = match response.json::().await { + Ok(user) => user.id, + Err(e) => { + log::error!("failed to get user: {e}"); + return Ok(None); + } + }; + + Ok(Some(UserId(user_id))) + } +} + +impl Display for GitHubAuth { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "GitHub") + } +} + +#[derive(Debug, Deserialize)] +struct UserResponse { + id: u64, +} diff --git a/registry/src/auth/mod.rs b/registry/src/auth/mod.rs new file mode 100644 index 0000000..b047ca4 --- /dev/null +++ b/registry/src/auth/mod.rs @@ -0,0 +1,188 @@ +mod github; +mod none; +mod rw_token; +mod token; + +use crate::{benv, make_reqwest, AppState}; +use actix_governor::{KeyExtractor, SimpleKeyExtractionError}; +use actix_web::{ + body::MessageBody, + dev::{ServiceRequest, ServiceResponse}, + error::Error as ActixError, + http::header::AUTHORIZATION, + middleware::Next, + web, HttpMessage, HttpResponse, +}; +use sha2::{Digest, Sha256}; +use std::fmt::Display; + +#[derive(Debug, Copy, Clone, Hash, PartialOrd, PartialEq, Eq, Ord)] +pub struct UserId(pub u64); + +impl UserId { + // there isn't any account on GitHub that has the ID 0, so it should be safe to use it + pub const DEFAULT: UserId = UserId(0); +} + +#[derive(Debug, Clone)] +pub struct UserIdExtractor; + +impl KeyExtractor for UserIdExtractor { + type Key = UserId; + type KeyExtractionError = SimpleKeyExtractionError<&'static str>; + + fn extract(&self, req: &ServiceRequest) -> Result { + match req.extensions().get::() { + Some(user_id) => Ok(*user_id), + None => Err(SimpleKeyExtractionError::new("UserId not found")), + } + } +} + +#[derive(Debug)] +pub enum Auth { + GitHub(github::GitHubAuth), + None(none::NoneAuth), + Token(token::TokenAuth), + RwToken(rw_token::RwTokenAuth), +} + +pub trait AuthImpl: Display { + async fn for_write_request(&self, req: &ServiceRequest) -> Result, ActixError>; + + async fn for_read_request(&self, req: &ServiceRequest) -> Result, ActixError> { + self.for_write_request(req).await + } + + fn read_needs_auth(&self) -> bool { + false + } +} + +impl AuthImpl for Auth { + async fn for_write_request(&self, req: &ServiceRequest) -> Result, ActixError> { + match self { + Auth::GitHub(github) => github.for_write_request(req).await, + Auth::None(none) => none.for_write_request(req).await, + Auth::Token(token) => token.for_write_request(req).await, + Auth::RwToken(rw_token) => rw_token.for_write_request(req).await, + } + } + + async fn for_read_request(&self, req: &ServiceRequest) -> Result, ActixError> { + match self { + Auth::GitHub(github) => github.for_read_request(req).await, + Auth::None(none) => none.for_write_request(req).await, + Auth::Token(token) => token.for_write_request(req).await, + Auth::RwToken(rw_token) => rw_token.for_read_request(req).await, + } + } + + fn read_needs_auth(&self) -> bool { + match self { + Auth::GitHub(github) => github.read_needs_auth(), + Auth::None(none) => none.read_needs_auth(), + Auth::Token(token) => token.read_needs_auth(), + Auth::RwToken(rw_token) => rw_token.read_needs_auth(), + } + } +} + +impl Display for Auth { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Auth::GitHub(github) => write!(f, "{}", github), + Auth::None(none) => write!(f, "{}", none), + Auth::Token(token) => write!(f, "{}", token), + Auth::RwToken(rw_token) => write!(f, "{}", rw_token), + } + } +} + +pub async fn write_mw( + app_state: web::Data, + req: ServiceRequest, + next: Next, +) -> Result, ActixError> { + let user_id = match app_state.auth.for_write_request(&req).await? { + Some(user_id) => user_id, + None => { + return Ok(req + .into_response(HttpResponse::Unauthorized().finish()) + .map_into_right_body()) + } + }; + + req.extensions_mut().insert(user_id); + + next.call(req).await.map(|res| res.map_into_left_body()) +} + +pub async fn read_mw( + app_state: web::Data, + req: ServiceRequest, + next: Next, +) -> Result, ActixError> { + if app_state.auth.read_needs_auth() { + let user_id = match app_state.auth.for_read_request(&req).await? { + Some(user_id) => user_id, + None => { + return Ok(req + .into_response(HttpResponse::Unauthorized().finish()) + .map_into_right_body()) + } + }; + + req.extensions_mut().insert(Some(user_id)); + } else { + req.extensions_mut().insert(None::); + } + + next.call(req).await.map(|res| res.map_into_left_body()) +} + +pub fn get_auth_from_env() -> Auth { + if let Ok(token) = benv!("ACCESS_TOKEN") { + Auth::Token(token::TokenAuth { + token: *Sha256::digest(token.as_bytes()).as_ref(), + }) + } else if benv!("GITHUB_AUTH").is_ok() { + Auth::GitHub(github::GitHubAuth { + reqwest_client: make_reqwest(), + }) + } else if let Ok((r, w)) = + benv!("READ_ACCESS_TOKEN").and_then(|r| benv!("WRITE_ACCESS_TOKEN").map(|w| (r, w))) + { + Auth::RwToken(rw_token::RwTokenAuth { + read_token: *Sha256::digest(r.as_bytes()).as_ref(), + write_token: *Sha256::digest(w.as_bytes()).as_ref(), + }) + } else { + Auth::None(none::NoneAuth) + } +} + +pub fn get_token_from_req(req: &ServiceRequest, bearer: bool) -> Option { + let token = match req + .headers() + .get(AUTHORIZATION) + .and_then(|token| token.to_str().ok()) + { + Some(token) => token, + None => return None, + }; + + let token = if bearer { + if token.to_lowercase().starts_with("bearer ") { + token.to_string() + } else { + format!("Bearer {token}") + } + } else if token.to_lowercase().starts_with("bearer ") { + token[7..].to_string() + } else { + token.to_string() + }; + + Some(token) +} diff --git a/registry/src/auth/none.rs b/registry/src/auth/none.rs new file mode 100644 index 0000000..c03f298 --- /dev/null +++ b/registry/src/auth/none.rs @@ -0,0 +1,18 @@ +use crate::auth::{AuthImpl, UserId}; +use actix_web::{dev::ServiceRequest, Error as ActixError}; +use std::fmt::Display; + +#[derive(Debug)] +pub struct NoneAuth; + +impl AuthImpl for NoneAuth { + async fn for_write_request(&self, _req: &ServiceRequest) -> Result, ActixError> { + Ok(Some(UserId::DEFAULT)) + } +} + +impl Display for NoneAuth { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "None") + } +} diff --git a/registry/src/auth/rw_token.rs b/registry/src/auth/rw_token.rs new file mode 100644 index 0000000..4798272 --- /dev/null +++ b/registry/src/auth/rw_token.rs @@ -0,0 +1,53 @@ +use crate::auth::{get_token_from_req, AuthImpl, UserId}; +use actix_web::{dev::ServiceRequest, Error as ActixError}; +use constant_time_eq::constant_time_eq_32; +use sha2::{Digest, Sha256}; +use std::fmt::Display; + +#[derive(Debug)] +pub struct RwTokenAuth { + pub read_token: [u8; 32], + pub write_token: [u8; 32], +} + +impl AuthImpl for RwTokenAuth { + async fn for_write_request(&self, req: &ServiceRequest) -> Result, ActixError> { + let token = match get_token_from_req(req, false) { + Some(token) => token, + None => return Ok(None), + }; + + let token: [u8; 32] = Sha256::digest(token.as_bytes()).into(); + + Ok(if constant_time_eq_32(&self.write_token, &token) { + Some(UserId::DEFAULT) + } else { + None + }) + } + + async fn for_read_request(&self, req: &ServiceRequest) -> Result, ActixError> { + let token = match get_token_from_req(req, false) { + Some(token) => token, + None => return Ok(None), + }; + + let token: [u8; 32] = Sha256::digest(token.as_bytes()).into(); + + Ok(if constant_time_eq_32(&self.read_token, &token) { + Some(UserId::DEFAULT) + } else { + None + }) + } + + fn read_needs_auth(&self) -> bool { + true + } +} + +impl Display for RwTokenAuth { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "RwToken") + } +} diff --git a/registry/src/auth/token.rs b/registry/src/auth/token.rs new file mode 100644 index 0000000..bec311b --- /dev/null +++ b/registry/src/auth/token.rs @@ -0,0 +1,34 @@ +use crate::auth::{get_token_from_req, AuthImpl, UserId}; +use actix_web::{dev::ServiceRequest, Error as ActixError}; +use constant_time_eq::constant_time_eq_32; +use sha2::{Digest, Sha256}; +use std::fmt::Display; + +#[derive(Debug)] +pub struct TokenAuth { + // needs to be an SHA-256 hash + pub token: [u8; 32], +} + +impl AuthImpl for TokenAuth { + async fn for_write_request(&self, req: &ServiceRequest) -> Result, ActixError> { + let token = match get_token_from_req(req, false) { + Some(token) => token, + None => return Ok(None), + }; + + let token: [u8; 32] = Sha256::digest(token.as_bytes()).into(); + + Ok(if constant_time_eq_32(&self.token, &token) { + Some(UserId::DEFAULT) + } else { + None + }) + } +} + +impl Display for TokenAuth { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Token") + } +} diff --git a/registry/src/endpoints/package_version.rs b/registry/src/endpoints/package_version.rs index 997ad31..cefe7dc 100644 --- a/registry/src/endpoints/package_version.rs +++ b/registry/src/endpoints/package_version.rs @@ -1,16 +1,8 @@ -use actix_web::{ - http::header::{ACCEPT, LOCATION}, - web, HttpRequest, HttpResponse, Responder, -}; -use rusty_s3::{actions::GetObject, S3Action}; +use actix_web::{http::header::ACCEPT, web, HttpRequest, HttpResponse, Responder}; use semver::Version; use serde::{Deserialize, Deserializer}; -use crate::{ - error::Error, - package::{s3_doc_name, s3_name, PackageResponse, S3_SIGN_DURATION}, - AppState, -}; +use crate::{error::Error, package::PackageResponse, storage::StorageImpl, AppState}; use pesde::{ manifest::target::TargetKind, names::PackageName, @@ -136,16 +128,7 @@ pub async fn get_package_version( return Ok(HttpResponse::NotFound().finish()); }; - let object_url = GetObject::new( - &app_state.s3_bucket, - Some(&app_state.s3_credentials), - &s3_doc_name(&hash), - ) - .sign(S3_SIGN_DURATION); - - return Ok(HttpResponse::TemporaryRedirect() - .append_header((LOCATION, object_url.as_str())) - .finish()); + return app_state.storage.get_doc(&hash).await; } let accept = request @@ -159,16 +142,11 @@ pub async fn get_package_version( }); if let Some(readme) = accept { - let object_url = GetObject::new( - &app_state.s3_bucket, - Some(&app_state.s3_credentials), - &s3_name(&name, v_id, readme), - ) - .sign(S3_SIGN_DURATION); - - return Ok(HttpResponse::TemporaryRedirect() - .append_header((LOCATION, object_url.as_str())) - .finish()); + return if readme { + app_state.storage.get_readme(&name, v_id).await + } else { + app_state.storage.get_package(&name, v_id).await + }; } let response = PackageResponse { diff --git a/registry/src/endpoints/publish_version.rs b/registry/src/endpoints/publish_version.rs index 114ed1d..df04942 100644 --- a/registry/src/endpoints/publish_version.rs +++ b/registry/src/endpoints/publish_version.rs @@ -2,10 +2,8 @@ use actix_multipart::Multipart; use actix_web::{web, HttpResponse, Responder}; use convert_case::{Case, Casing}; use flate2::read::GzDecoder; -use futures::{future::join_all, StreamExt}; +use futures::{future::join_all, join, StreamExt}; use git2::{Remote, Repository, Signature}; -use reqwest::header::{CONTENT_ENCODING, CONTENT_TYPE}; -use rusty_s3::{actions::PutObject, S3Action}; use serde::Deserialize; use sha2::{Digest, Sha256}; use std::{ @@ -19,8 +17,8 @@ use crate::{ auth::UserId, benv, error::{Error, ErrorResponse}, - package::{s3_doc_name, s3_name, S3_SIGN_DURATION}, search::update_version, + storage::StorageImpl, AppState, }; use pesde::{ @@ -454,57 +452,29 @@ pub async fn publish_package( let version_id = VersionId::new(manifest.version.clone(), manifest.target.kind()); - join_all( - std::iter::once({ - let object_url = PutObject::new( - &app_state.s3_bucket, - Some(&app_state.s3_credentials), - &s3_name(&manifest.name, &version_id, false), - ) - .sign(S3_SIGN_DURATION); - - app_state - .reqwest_client - .put(object_url) - .header(CONTENT_TYPE, "application/gzip") - .header(CONTENT_ENCODING, "gzip") - .body(bytes) - .send() - }) - .chain(docs_pages.into_iter().map(|(hash, content)| { - let object_url = PutObject::new( - &app_state.s3_bucket, - Some(&app_state.s3_credentials), - &s3_doc_name(&hash), - ) - .sign(S3_SIGN_DURATION); - - app_state - .reqwest_client - .put(object_url) - .header(CONTENT_TYPE, "text/plain") - .header(CONTENT_ENCODING, "gzip") - .body(content) - .send() - })) - .chain(readme.map(|readme| { - let object_url = PutObject::new( - &app_state.s3_bucket, - Some(&app_state.s3_credentials), - &s3_name(&manifest.name, &version_id, true), - ) - .sign(S3_SIGN_DURATION); - - app_state - .reqwest_client - .put(object_url) - .header(CONTENT_TYPE, "text/plain") - .header(CONTENT_ENCODING, "gzip") - .body(readme) - .send() - })), - ) - .await; + let (a, b, c) = join!( + app_state + .storage + .store_package(&manifest.name, &version_id, bytes.to_vec(),), + join_all( + docs_pages + .into_iter() + .map(|(hash, content)| app_state.storage.store_doc(hash, content)), + ), + async { + if let Some(readme) = readme { + app_state + .storage + .store_readme(&manifest.name, &version_id, readme) + .await + } else { + Ok(()) + } + } + ); + a?; + b.into_iter().collect::>()?; + c?; Ok(HttpResponse::Ok().body(format!( "published {}@{} {}", diff --git a/registry/src/main.rs b/registry/src/main.rs index 7d4c30e..ceeef8a 100644 --- a/registry/src/main.rs +++ b/registry/src/main.rs @@ -6,7 +6,6 @@ use actix_web::{ web, App, HttpServer, }; use log::info; -use rusty_s3::{Bucket, Credentials, UrlStyle}; use std::{env::current_dir, fs::create_dir_all, path::PathBuf, sync::Mutex}; use pesde::{ @@ -14,21 +13,35 @@ use pesde::{ AuthConfig, Project, }; -use crate::{auth::UserIdExtractor, search::make_search}; +use crate::{ + auth::{get_auth_from_env, Auth, UserIdExtractor}, + search::make_search, + storage::{get_storage_from_env, Storage}, +}; mod auth; mod endpoints; mod error; mod package; mod search; +mod storage; + +pub fn make_reqwest() -> reqwest::Client { + reqwest::ClientBuilder::new() + .user_agent(concat!( + env!("CARGO_PKG_NAME"), + "/", + env!("CARGO_PKG_VERSION") + )) + .build() + .unwrap() +} pub struct AppState { - pub s3_bucket: Bucket, - pub s3_credentials: Credentials, - pub source: Mutex, pub project: Project, - pub reqwest_client: reqwest::Client, + pub storage: Storage, + pub auth: Auth, pub search_reader: tantivy::IndexReader, pub search_writer: Mutex, @@ -91,28 +104,18 @@ async fn run(with_sentry: bool) -> std::io::Result<()> { let (search_reader, search_writer) = make_search(&project, &source); let app_data = web::Data::new(AppState { - s3_bucket: Bucket::new( - benv!(parse required "S3_ENDPOINT"), - UrlStyle::Path, - benv!(required "S3_BUCKET_NAME"), - benv!(required "S3_REGION"), - ) - .unwrap(), - s3_credentials: Credentials::new( - benv!(required "S3_ACCESS_KEY"), - benv!(required "S3_SECRET_KEY"), - ), - source: Mutex::new(source), project, - reqwest_client: reqwest::ClientBuilder::new() - .user_agent(concat!( - env!("CARGO_PKG_NAME"), - "/", - env!("CARGO_PKG_VERSION") - )) - .build() - .unwrap(), + storage: { + let storage = get_storage_from_env(); + info!("storage: {storage}"); + storage + }, + auth: { + let auth = get_auth_from_env(); + info!("auth: {auth}"); + auth + }, search_reader, search_writer: Mutex::new(search_writer), @@ -144,21 +147,30 @@ async fn run(with_sentry: bool) -> std::io::Result<()> { ) .service( web::scope("/v0") - .route("/search", web::get().to(endpoints::search::search_packages)) + .route( + "/search", + web::get() + .to(endpoints::search::search_packages) + .wrap(from_fn(auth::read_mw)), + ) .route( "/packages/{name}", - web::get().to(endpoints::package_versions::get_package_versions), + web::get() + .to(endpoints::package_versions::get_package_versions) + .wrap(from_fn(auth::read_mw)), ) .route( "/packages/{name}/{version}/{target}", - web::get().to(endpoints::package_version::get_package_version), + web::get() + .to(endpoints::package_version::get_package_version) + .wrap(from_fn(auth::read_mw)), ) .route( "/packages", web::post() .to(endpoints::publish_version::publish_package) .wrap(Governor::new(&publish_governor_config)) - .wrap(from_fn(auth::authentication)), + .wrap(from_fn(auth::write_mw)), ), ) }) diff --git a/registry/src/package.rs b/registry/src/package.rs index 6eb1045..93ad5e8 100644 --- a/registry/src/package.rs +++ b/registry/src/package.rs @@ -1,26 +1,7 @@ use chrono::{DateTime, Utc}; -use pesde::{ - manifest::target::{Target, TargetKind}, - names::PackageName, - source::version_id::VersionId, -}; +use pesde::manifest::target::{Target, TargetKind}; use serde::Serialize; -use std::{collections::BTreeSet, time::Duration}; - -pub const S3_SIGN_DURATION: Duration = Duration::from_secs(60 * 3); - -pub fn s3_name(name: &PackageName, version_id: &VersionId, is_readme: bool) -> String { - format!( - "{name}/{}/{}/{}.gz", - version_id.version(), - version_id.target(), - if is_readme { "readme" } else { "pkg.tar" }, - ) -} - -pub fn s3_doc_name(doc_hash: &str) -> String { - format!("doc/{}.gz", doc_hash) -} +use std::collections::BTreeSet; #[derive(Debug, Serialize, Eq, PartialEq)] pub struct TargetInfo { diff --git a/registry/src/storage/fs.rs b/registry/src/storage/fs.rs new file mode 100644 index 0000000..dd3f7dd --- /dev/null +++ b/registry/src/storage/fs.rs @@ -0,0 +1,126 @@ +use crate::{error::Error, storage::StorageImpl}; +use actix_web::{ + http::header::{CONTENT_ENCODING, CONTENT_TYPE}, + HttpResponse, +}; +use pesde::{names::PackageName, source::version_id::VersionId}; +use std::{fmt::Display, fs::create_dir_all, path::PathBuf}; + +#[derive(Debug)] +pub struct FSStorage { + pub root: PathBuf, +} + +impl StorageImpl for FSStorage { + async fn store_package( + &self, + package_name: &PackageName, + version: &VersionId, + contents: Vec, + ) -> Result<(), Error> { + let (scope, name) = package_name.as_str(); + + let path = self + .root + .join(scope) + .join(name) + .join(version.version().to_string()) + .join(version.target().to_string()); + create_dir_all(&path)?; + + std::fs::write(path.join("pkg.tar.gz"), &contents)?; + + Ok(()) + } + + async fn get_package( + &self, + package_name: &PackageName, + version: &VersionId, + ) -> Result { + let (scope, name) = package_name.as_str(); + + let path = self + .root + .join(scope) + .join(name) + .join(version.version().to_string()) + .join(version.target().to_string()); + + let contents = std::fs::read(path.join("pkg.tar.gz"))?; + + Ok(HttpResponse::Ok() + .append_header((CONTENT_TYPE, "application/gzip")) + .append_header((CONTENT_ENCODING, "gzip")) + .body(contents)) + } + + async fn store_readme( + &self, + package_name: &PackageName, + version: &VersionId, + contents: Vec, + ) -> Result<(), Error> { + let (scope, name) = package_name.as_str(); + + let path = self + .root + .join(scope) + .join(name) + .join(version.version().to_string()) + .join(version.target().to_string()); + create_dir_all(&path)?; + + std::fs::write(path.join("readme.gz"), &contents)?; + + Ok(()) + } + + async fn get_readme( + &self, + package_name: &PackageName, + version: &VersionId, + ) -> Result { + let (scope, name) = package_name.as_str(); + + let path = self + .root + .join(scope) + .join(name) + .join(version.version().to_string()) + .join(version.target().to_string()); + + let contents = std::fs::read(path.join("readme.gz"))?; + + Ok(HttpResponse::Ok() + .append_header((CONTENT_TYPE, "text/plain")) + .append_header((CONTENT_ENCODING, "gzip")) + .body(contents)) + } + + async fn store_doc(&self, doc_hash: String, contents: Vec) -> Result<(), Error> { + let path = self.root.join("docs"); + create_dir_all(&path)?; + + std::fs::write(path.join(format!("{doc_hash}.gz")), &contents)?; + + Ok(()) + } + + async fn get_doc(&self, doc_hash: &str) -> Result { + let path = self.root.join("docs"); + + let contents = std::fs::read(path.join(format!("{doc_hash}.gz")))?; + + Ok(HttpResponse::Ok() + .append_header((CONTENT_TYPE, "text/plain")) + .append_header((CONTENT_ENCODING, "gzip")) + .body(contents)) + } +} + +impl Display for FSStorage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "FS") + } +} diff --git a/registry/src/storage/mod.rs b/registry/src/storage/mod.rs new file mode 100644 index 0000000..ff05962 --- /dev/null +++ b/registry/src/storage/mod.rs @@ -0,0 +1,141 @@ +use crate::{benv, error::Error, make_reqwest}; +use actix_web::HttpResponse; +use pesde::{names::PackageName, source::version_id::VersionId}; +use rusty_s3::{Bucket, Credentials, UrlStyle}; +use std::fmt::Display; + +mod fs; +mod s3; + +#[derive(Debug)] +pub enum Storage { + S3(s3::S3Storage), + FS(fs::FSStorage), +} + +pub trait StorageImpl: Display { + async fn store_package( + &self, + package_name: &PackageName, + version: &VersionId, + contents: Vec, + ) -> Result<(), crate::error::Error>; + async fn get_package( + &self, + package_name: &PackageName, + version: &VersionId, + ) -> Result; + + async fn store_readme( + &self, + package_name: &PackageName, + version: &VersionId, + contents: Vec, + ) -> Result<(), crate::error::Error>; + async fn get_readme( + &self, + package_name: &PackageName, + version: &VersionId, + ) -> Result; + + async fn store_doc( + &self, + doc_hash: String, + contents: Vec, + ) -> Result<(), crate::error::Error>; + async fn get_doc(&self, doc_hash: &str) -> Result; +} + +impl StorageImpl for Storage { + async fn store_package( + &self, + package_name: &PackageName, + version: &VersionId, + contents: Vec, + ) -> Result<(), Error> { + match self { + Storage::S3(s3) => s3.store_package(package_name, version, contents).await, + Storage::FS(fs) => fs.store_package(package_name, version, contents).await, + } + } + + async fn get_package( + &self, + package_name: &PackageName, + version: &VersionId, + ) -> Result { + match self { + Storage::S3(s3) => s3.get_package(package_name, version).await, + Storage::FS(fs) => fs.get_package(package_name, version).await, + } + } + + async fn store_readme( + &self, + package_name: &PackageName, + version: &VersionId, + contents: Vec, + ) -> Result<(), Error> { + match self { + Storage::S3(s3) => s3.store_readme(package_name, version, contents).await, + Storage::FS(fs) => fs.store_readme(package_name, version, contents).await, + } + } + + async fn get_readme( + &self, + package_name: &PackageName, + version: &VersionId, + ) -> Result { + match self { + Storage::S3(s3) => s3.get_readme(package_name, version).await, + Storage::FS(fs) => fs.get_readme(package_name, version).await, + } + } + + async fn store_doc(&self, doc_hash: String, contents: Vec) -> Result<(), Error> { + match self { + Storage::S3(s3) => s3.store_doc(doc_hash, contents).await, + Storage::FS(fs) => fs.store_doc(doc_hash, contents).await, + } + } + + async fn get_doc(&self, doc_hash: &str) -> Result { + match self { + Storage::S3(s3) => s3.get_doc(doc_hash).await, + Storage::FS(fs) => fs.get_doc(doc_hash).await, + } + } +} + +impl Display for Storage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Storage::S3(s3) => write!(f, "{}", s3), + Storage::FS(fs) => write!(f, "{}", fs), + } + } +} + +pub fn get_storage_from_env() -> Storage { + if let Ok(endpoint) = benv!(parse "S3_ENDPOINT") { + Storage::S3(s3::S3Storage { + s3_bucket: Bucket::new( + endpoint, + UrlStyle::Path, + benv!(required "S3_BUCKET_NAME"), + benv!(required "S3_REGION"), + ) + .unwrap(), + s3_credentials: Credentials::new( + benv!(required "S3_ACCESS_KEY"), + benv!(required "S3_SECRET_KEY"), + ), + reqwest_client: make_reqwest(), + }) + } else if let Ok(root) = benv!(parse "FS_STORAGE_ROOT") { + Storage::FS(fs::FSStorage { root }) + } else { + panic!("no storage backend configured") + } +} diff --git a/registry/src/storage/s3.rs b/registry/src/storage/s3.rs new file mode 100644 index 0000000..5cfc64b --- /dev/null +++ b/registry/src/storage/s3.rs @@ -0,0 +1,153 @@ +use crate::{error::Error, storage::StorageImpl}; +use actix_web::{http::header::LOCATION, HttpResponse}; +use pesde::{names::PackageName, source::version_id::VersionId}; +use reqwest::header::{CONTENT_ENCODING, CONTENT_TYPE}; +use rusty_s3::{actions::PutObject, Bucket, Credentials, S3Action}; +use std::{fmt::Display, time::Duration}; + +#[derive(Debug)] +pub struct S3Storage { + pub s3_bucket: Bucket, + pub s3_credentials: Credentials, + pub reqwest_client: reqwest::Client, +} + +pub const S3_SIGN_DURATION: Duration = Duration::from_secs(60 * 3); + +impl StorageImpl for S3Storage { + async fn store_package( + &self, + package_name: &PackageName, + version: &VersionId, + contents: Vec, + ) -> Result<(), Error> { + let object_url = PutObject::new( + &self.s3_bucket, + Some(&self.s3_credentials), + &format!( + "{package_name}/{}/{}/pkg.tar.gz", + version.version(), + version.target() + ), + ) + .sign(S3_SIGN_DURATION); + + self.reqwest_client + .put(object_url) + .header(CONTENT_TYPE, "application/gzip") + .header(CONTENT_ENCODING, "gzip") + .body(contents) + .send() + .await?; + + Ok(()) + } + + async fn get_package( + &self, + package_name: &PackageName, + version: &VersionId, + ) -> Result { + let object_url = PutObject::new( + &self.s3_bucket, + Some(&self.s3_credentials), + &format!( + "{package_name}/{}/{}/pkg.tar.gz", + version.version(), + version.target() + ), + ) + .sign(S3_SIGN_DURATION); + + Ok(HttpResponse::TemporaryRedirect() + .append_header((LOCATION, object_url.as_str())) + .finish()) + } + + async fn store_readme( + &self, + package_name: &PackageName, + version: &VersionId, + contents: Vec, + ) -> Result<(), Error> { + let object_url = PutObject::new( + &self.s3_bucket, + Some(&self.s3_credentials), + &format!( + "{package_name}/{}/{}/readme.gz", + version.version(), + version.target() + ), + ) + .sign(S3_SIGN_DURATION); + + self.reqwest_client + .put(object_url) + .header(CONTENT_TYPE, "text/plain") + .header(CONTENT_ENCODING, "gzip") + .body(contents) + .send() + .await?; + + Ok(()) + } + + async fn get_readme( + &self, + package_name: &PackageName, + version: &VersionId, + ) -> Result { + let object_url = PutObject::new( + &self.s3_bucket, + Some(&self.s3_credentials), + &format!( + "{package_name}/{}/{}/readme.gz", + version.version(), + version.target() + ), + ) + .sign(S3_SIGN_DURATION); + + Ok(HttpResponse::TemporaryRedirect() + .append_header((LOCATION, object_url.as_str())) + .finish()) + } + + async fn store_doc(&self, doc_hash: String, contents: Vec) -> Result<(), Error> { + let object_url = PutObject::new( + &self.s3_bucket, + Some(&self.s3_credentials), + &format!("doc/{}.gz", doc_hash), + ) + .sign(S3_SIGN_DURATION); + + self.reqwest_client + .put(object_url) + .header(CONTENT_TYPE, "text/plain") + .header(CONTENT_ENCODING, "gzip") + .body(contents) + .send() + .await?; + + Ok(()) + } + + async fn get_doc(&self, doc_hash: &str) -> Result { + let object_url = PutObject::new( + &self.s3_bucket, + Some(&self.s3_credentials), + &format!("doc/{}.gz", doc_hash), + ) + .sign(S3_SIGN_DURATION); + + Ok(HttpResponse::TemporaryRedirect() + .append_header((LOCATION, object_url.as_str())) + .finish()) + } +} + +impl Display for S3Storage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "S3") + } +}