feat(registry): make storage & auth customisable

This commit is contained in:
daimond113 2024-09-05 21:57:47 +02:00
parent 702153d81b
commit 30c4d0c391
No known key found for this signature in database
GPG key ID: 3A8ECE51328B513C
16 changed files with 873 additions and 236 deletions

1
Cargo.lock generated
View file

@ -3548,6 +3548,7 @@ dependencies = [
"actix-multipart",
"actix-web",
"chrono",
"constant_time_eq",
"convert_case 0.6.0",
"dotenvy",
"flate2",

View file

@ -6,10 +6,32 @@ GITHUB_PAT= # personal access token of github account with push access
COMMITTER_GIT_NAME= # name of the committer used for index updates
COMMITTER_GIT_EMAIL= # email of the committer used for index updates
S3_ENDPOINT= # endpoint of the s3 bucket
S3_BUCKET_NAME= # name of the s3 bucket
S3_REGION= # region of the s3 bucket
S3_ACCESS_KEY= # access key of the s3 bucket
S3_SECRET_KEY= # secret key of the s3 bucket
# AUTHENTICATION CONFIGURATION
# Set the variables of the authentication you want to use in order to enable it
# Single Token
ACCESS_TOKEN= # a single token that is used to authenticate all publish requests
# Read/Write Tokens
READ_ACCESS_TOKEN= # a token that is used to authenticate read requests
WRITE_ACCESS_TOKEN= # a token that is used to authenticate write requests
# GitHub
GITHUB_AUTH= # set to any value to enable GitHub authentication
# If none of the above is set, no authentication is required, even for write requests
# STORAGE CONFIGURATION
# Set the variables of the storage you want to use in order to enable it
# S3
S3_ENDPOINT= # endpoint of the S3 bucket
S3_BUCKET_NAME= # name of the S3 bucket
S3_REGION= # region of the S3 bucket
S3_ACCESS_KEY= # access key of the S3 bucket
S3_SECRET_KEY= # secret key of the S3 bucket
# FS
FS_STORAGE_ROOT= # root directory of the filesystem storage
SENTRY_URL= # optional url of sentry error tracking

View file

@ -34,6 +34,7 @@ sha2 = "0.10.8"
rusty-s3 = "0.5.0"
reqwest = { version = "0.12.7", features = ["json", "rustls-tls"] }
constant_time_eq = "0.3.1"
tar = "0.4.41"
flate2 = "1.0.33"

View file

@ -1,95 +0,0 @@
use crate::AppState;
use actix_governor::{KeyExtractor, SimpleKeyExtractionError};
use actix_web::{
body::MessageBody,
dev::{ServiceRequest, ServiceResponse},
error::Error as ActixError,
http::header::AUTHORIZATION,
middleware::Next,
web, HttpMessage, HttpResponse,
};
use serde::Deserialize;
#[derive(Debug, Copy, Clone, Hash, PartialOrd, PartialEq, Eq, Ord)]
pub struct UserId(pub u64);
#[derive(Debug, Deserialize)]
struct UserResponse {
id: u64,
}
pub async fn authentication(
app_state: web::Data<AppState>,
req: ServiceRequest,
next: Next<impl MessageBody + 'static>,
) -> Result<ServiceResponse<impl MessageBody>, ActixError> {
let token = match req
.headers()
.get(AUTHORIZATION)
.map(|token| token.to_str().unwrap())
{
Some(token) => token,
None => {
return Ok(req
.into_response(HttpResponse::Unauthorized().finish())
.map_into_right_body())
}
};
let token = if token.to_lowercase().starts_with("bearer ") {
token.to_string()
} else {
format!("Bearer {token}")
};
let response = match app_state
.reqwest_client
.get("https://api.github.com/user")
.header(reqwest::header::AUTHORIZATION, token)
.send()
.await
.and_then(|res| res.error_for_status())
{
Ok(response) => response,
Err(e) if e.status() == Some(reqwest::StatusCode::UNAUTHORIZED) => {
return Ok(req
.into_response(HttpResponse::Unauthorized().finish())
.map_into_right_body())
}
Err(e) => {
log::error!("failed to get user: {e}");
return Ok(req
.into_response(HttpResponse::InternalServerError().finish())
.map_into_right_body());
}
};
let user_id = match response.json::<UserResponse>().await {
Ok(user) => user.id,
Err(_) => {
return Ok(req
.into_response(HttpResponse::Unauthorized().finish())
.map_into_right_body())
}
};
req.extensions_mut().insert(UserId(user_id));
let res = next.call(req).await?;
Ok(res.map_into_left_body())
}
#[derive(Debug, Clone)]
pub struct UserIdExtractor;
impl KeyExtractor for UserIdExtractor {
type Key = UserId;
type KeyExtractionError = SimpleKeyExtractionError<&'static str>;
fn extract(&self, req: &ServiceRequest) -> Result<Self::Key, Self::KeyExtractionError> {
match req.extensions().get::<UserId>() {
Some(user_id) => Ok(*user_id),
None => Err(SimpleKeyExtractionError::new("UserId not found")),
}
}
}

View file

@ -0,0 +1,54 @@
use crate::auth::{get_token_from_req, AuthImpl, UserId};
use actix_web::{dev::ServiceRequest, Error as ActixError};
use serde::Deserialize;
use std::fmt::Display;
#[derive(Debug)]
pub struct GitHubAuth {
pub reqwest_client: reqwest::Client,
}
impl AuthImpl for GitHubAuth {
async fn for_write_request(&self, req: &ServiceRequest) -> Result<Option<UserId>, ActixError> {
let token = match get_token_from_req(req, true) {
Some(token) => token,
None => return Ok(None),
};
let response = match self
.reqwest_client
.get("https://api.github.com/user")
.header(reqwest::header::AUTHORIZATION, token)
.send()
.await
.and_then(|res| res.error_for_status())
{
Ok(response) => response,
Err(e) => {
log::error!("failed to get user: {e}");
return Ok(None);
}
};
let user_id = match response.json::<UserResponse>().await {
Ok(user) => user.id,
Err(e) => {
log::error!("failed to get user: {e}");
return Ok(None);
}
};
Ok(Some(UserId(user_id)))
}
}
impl Display for GitHubAuth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "GitHub")
}
}
#[derive(Debug, Deserialize)]
struct UserResponse {
id: u64,
}

188
registry/src/auth/mod.rs Normal file
View file

@ -0,0 +1,188 @@
mod github;
mod none;
mod rw_token;
mod token;
use crate::{benv, make_reqwest, AppState};
use actix_governor::{KeyExtractor, SimpleKeyExtractionError};
use actix_web::{
body::MessageBody,
dev::{ServiceRequest, ServiceResponse},
error::Error as ActixError,
http::header::AUTHORIZATION,
middleware::Next,
web, HttpMessage, HttpResponse,
};
use sha2::{Digest, Sha256};
use std::fmt::Display;
#[derive(Debug, Copy, Clone, Hash, PartialOrd, PartialEq, Eq, Ord)]
pub struct UserId(pub u64);
impl UserId {
// there isn't any account on GitHub that has the ID 0, so it should be safe to use it
pub const DEFAULT: UserId = UserId(0);
}
#[derive(Debug, Clone)]
pub struct UserIdExtractor;
impl KeyExtractor for UserIdExtractor {
type Key = UserId;
type KeyExtractionError = SimpleKeyExtractionError<&'static str>;
fn extract(&self, req: &ServiceRequest) -> Result<Self::Key, Self::KeyExtractionError> {
match req.extensions().get::<UserId>() {
Some(user_id) => Ok(*user_id),
None => Err(SimpleKeyExtractionError::new("UserId not found")),
}
}
}
#[derive(Debug)]
pub enum Auth {
GitHub(github::GitHubAuth),
None(none::NoneAuth),
Token(token::TokenAuth),
RwToken(rw_token::RwTokenAuth),
}
pub trait AuthImpl: Display {
async fn for_write_request(&self, req: &ServiceRequest) -> Result<Option<UserId>, ActixError>;
async fn for_read_request(&self, req: &ServiceRequest) -> Result<Option<UserId>, ActixError> {
self.for_write_request(req).await
}
fn read_needs_auth(&self) -> bool {
false
}
}
impl AuthImpl for Auth {
async fn for_write_request(&self, req: &ServiceRequest) -> Result<Option<UserId>, ActixError> {
match self {
Auth::GitHub(github) => github.for_write_request(req).await,
Auth::None(none) => none.for_write_request(req).await,
Auth::Token(token) => token.for_write_request(req).await,
Auth::RwToken(rw_token) => rw_token.for_write_request(req).await,
}
}
async fn for_read_request(&self, req: &ServiceRequest) -> Result<Option<UserId>, ActixError> {
match self {
Auth::GitHub(github) => github.for_read_request(req).await,
Auth::None(none) => none.for_write_request(req).await,
Auth::Token(token) => token.for_write_request(req).await,
Auth::RwToken(rw_token) => rw_token.for_read_request(req).await,
}
}
fn read_needs_auth(&self) -> bool {
match self {
Auth::GitHub(github) => github.read_needs_auth(),
Auth::None(none) => none.read_needs_auth(),
Auth::Token(token) => token.read_needs_auth(),
Auth::RwToken(rw_token) => rw_token.read_needs_auth(),
}
}
}
impl Display for Auth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Auth::GitHub(github) => write!(f, "{}", github),
Auth::None(none) => write!(f, "{}", none),
Auth::Token(token) => write!(f, "{}", token),
Auth::RwToken(rw_token) => write!(f, "{}", rw_token),
}
}
}
pub async fn write_mw(
app_state: web::Data<AppState>,
req: ServiceRequest,
next: Next<impl MessageBody + 'static>,
) -> Result<ServiceResponse<impl MessageBody>, ActixError> {
let user_id = match app_state.auth.for_write_request(&req).await? {
Some(user_id) => user_id,
None => {
return Ok(req
.into_response(HttpResponse::Unauthorized().finish())
.map_into_right_body())
}
};
req.extensions_mut().insert(user_id);
next.call(req).await.map(|res| res.map_into_left_body())
}
pub async fn read_mw(
app_state: web::Data<AppState>,
req: ServiceRequest,
next: Next<impl MessageBody + 'static>,
) -> Result<ServiceResponse<impl MessageBody>, ActixError> {
if app_state.auth.read_needs_auth() {
let user_id = match app_state.auth.for_read_request(&req).await? {
Some(user_id) => user_id,
None => {
return Ok(req
.into_response(HttpResponse::Unauthorized().finish())
.map_into_right_body())
}
};
req.extensions_mut().insert(Some(user_id));
} else {
req.extensions_mut().insert(None::<UserId>);
}
next.call(req).await.map(|res| res.map_into_left_body())
}
pub fn get_auth_from_env() -> Auth {
if let Ok(token) = benv!("ACCESS_TOKEN") {
Auth::Token(token::TokenAuth {
token: *Sha256::digest(token.as_bytes()).as_ref(),
})
} else if benv!("GITHUB_AUTH").is_ok() {
Auth::GitHub(github::GitHubAuth {
reqwest_client: make_reqwest(),
})
} else if let Ok((r, w)) =
benv!("READ_ACCESS_TOKEN").and_then(|r| benv!("WRITE_ACCESS_TOKEN").map(|w| (r, w)))
{
Auth::RwToken(rw_token::RwTokenAuth {
read_token: *Sha256::digest(r.as_bytes()).as_ref(),
write_token: *Sha256::digest(w.as_bytes()).as_ref(),
})
} else {
Auth::None(none::NoneAuth)
}
}
pub fn get_token_from_req(req: &ServiceRequest, bearer: bool) -> Option<String> {
let token = match req
.headers()
.get(AUTHORIZATION)
.and_then(|token| token.to_str().ok())
{
Some(token) => token,
None => return None,
};
let token = if bearer {
if token.to_lowercase().starts_with("bearer ") {
token.to_string()
} else {
format!("Bearer {token}")
}
} else if token.to_lowercase().starts_with("bearer ") {
token[7..].to_string()
} else {
token.to_string()
};
Some(token)
}

18
registry/src/auth/none.rs Normal file
View file

@ -0,0 +1,18 @@
use crate::auth::{AuthImpl, UserId};
use actix_web::{dev::ServiceRequest, Error as ActixError};
use std::fmt::Display;
#[derive(Debug)]
pub struct NoneAuth;
impl AuthImpl for NoneAuth {
async fn for_write_request(&self, _req: &ServiceRequest) -> Result<Option<UserId>, ActixError> {
Ok(Some(UserId::DEFAULT))
}
}
impl Display for NoneAuth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "None")
}
}

View file

@ -0,0 +1,53 @@
use crate::auth::{get_token_from_req, AuthImpl, UserId};
use actix_web::{dev::ServiceRequest, Error as ActixError};
use constant_time_eq::constant_time_eq_32;
use sha2::{Digest, Sha256};
use std::fmt::Display;
#[derive(Debug)]
pub struct RwTokenAuth {
pub read_token: [u8; 32],
pub write_token: [u8; 32],
}
impl AuthImpl for RwTokenAuth {
async fn for_write_request(&self, req: &ServiceRequest) -> Result<Option<UserId>, ActixError> {
let token = match get_token_from_req(req, false) {
Some(token) => token,
None => return Ok(None),
};
let token: [u8; 32] = Sha256::digest(token.as_bytes()).into();
Ok(if constant_time_eq_32(&self.write_token, &token) {
Some(UserId::DEFAULT)
} else {
None
})
}
async fn for_read_request(&self, req: &ServiceRequest) -> Result<Option<UserId>, ActixError> {
let token = match get_token_from_req(req, false) {
Some(token) => token,
None => return Ok(None),
};
let token: [u8; 32] = Sha256::digest(token.as_bytes()).into();
Ok(if constant_time_eq_32(&self.read_token, &token) {
Some(UserId::DEFAULT)
} else {
None
})
}
fn read_needs_auth(&self) -> bool {
true
}
}
impl Display for RwTokenAuth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "RwToken")
}
}

View file

@ -0,0 +1,34 @@
use crate::auth::{get_token_from_req, AuthImpl, UserId};
use actix_web::{dev::ServiceRequest, Error as ActixError};
use constant_time_eq::constant_time_eq_32;
use sha2::{Digest, Sha256};
use std::fmt::Display;
#[derive(Debug)]
pub struct TokenAuth {
// needs to be an SHA-256 hash
pub token: [u8; 32],
}
impl AuthImpl for TokenAuth {
async fn for_write_request(&self, req: &ServiceRequest) -> Result<Option<UserId>, ActixError> {
let token = match get_token_from_req(req, false) {
Some(token) => token,
None => return Ok(None),
};
let token: [u8; 32] = Sha256::digest(token.as_bytes()).into();
Ok(if constant_time_eq_32(&self.token, &token) {
Some(UserId::DEFAULT)
} else {
None
})
}
}
impl Display for TokenAuth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Token")
}
}

View file

@ -1,16 +1,8 @@
use actix_web::{
http::header::{ACCEPT, LOCATION},
web, HttpRequest, HttpResponse, Responder,
};
use rusty_s3::{actions::GetObject, S3Action};
use actix_web::{http::header::ACCEPT, web, HttpRequest, HttpResponse, Responder};
use semver::Version;
use serde::{Deserialize, Deserializer};
use crate::{
error::Error,
package::{s3_doc_name, s3_name, PackageResponse, S3_SIGN_DURATION},
AppState,
};
use crate::{error::Error, package::PackageResponse, storage::StorageImpl, AppState};
use pesde::{
manifest::target::TargetKind,
names::PackageName,
@ -136,16 +128,7 @@ pub async fn get_package_version(
return Ok(HttpResponse::NotFound().finish());
};
let object_url = GetObject::new(
&app_state.s3_bucket,
Some(&app_state.s3_credentials),
&s3_doc_name(&hash),
)
.sign(S3_SIGN_DURATION);
return Ok(HttpResponse::TemporaryRedirect()
.append_header((LOCATION, object_url.as_str()))
.finish());
return app_state.storage.get_doc(&hash).await;
}
let accept = request
@ -159,16 +142,11 @@ pub async fn get_package_version(
});
if let Some(readme) = accept {
let object_url = GetObject::new(
&app_state.s3_bucket,
Some(&app_state.s3_credentials),
&s3_name(&name, v_id, readme),
)
.sign(S3_SIGN_DURATION);
return Ok(HttpResponse::TemporaryRedirect()
.append_header((LOCATION, object_url.as_str()))
.finish());
return if readme {
app_state.storage.get_readme(&name, v_id).await
} else {
app_state.storage.get_package(&name, v_id).await
};
}
let response = PackageResponse {

View file

@ -2,10 +2,8 @@ use actix_multipart::Multipart;
use actix_web::{web, HttpResponse, Responder};
use convert_case::{Case, Casing};
use flate2::read::GzDecoder;
use futures::{future::join_all, StreamExt};
use futures::{future::join_all, join, StreamExt};
use git2::{Remote, Repository, Signature};
use reqwest::header::{CONTENT_ENCODING, CONTENT_TYPE};
use rusty_s3::{actions::PutObject, S3Action};
use serde::Deserialize;
use sha2::{Digest, Sha256};
use std::{
@ -19,8 +17,8 @@ use crate::{
auth::UserId,
benv,
error::{Error, ErrorResponse},
package::{s3_doc_name, s3_name, S3_SIGN_DURATION},
search::update_version,
storage::StorageImpl,
AppState,
};
use pesde::{
@ -454,57 +452,29 @@ pub async fn publish_package(
let version_id = VersionId::new(manifest.version.clone(), manifest.target.kind());
let (a, b, c) = join!(
app_state
.storage
.store_package(&manifest.name, &version_id, bytes.to_vec(),),
join_all(
std::iter::once({
let object_url = PutObject::new(
&app_state.s3_bucket,
Some(&app_state.s3_credentials),
&s3_name(&manifest.name, &version_id, false),
)
.sign(S3_SIGN_DURATION);
docs_pages
.into_iter()
.map(|(hash, content)| app_state.storage.store_doc(hash, content)),
),
async {
if let Some(readme) = readme {
app_state
.reqwest_client
.put(object_url)
.header(CONTENT_TYPE, "application/gzip")
.header(CONTENT_ENCODING, "gzip")
.body(bytes)
.send()
})
.chain(docs_pages.into_iter().map(|(hash, content)| {
let object_url = PutObject::new(
&app_state.s3_bucket,
Some(&app_state.s3_credentials),
&s3_doc_name(&hash),
)
.sign(S3_SIGN_DURATION);
app_state
.reqwest_client
.put(object_url)
.header(CONTENT_TYPE, "text/plain")
.header(CONTENT_ENCODING, "gzip")
.body(content)
.send()
}))
.chain(readme.map(|readme| {
let object_url = PutObject::new(
&app_state.s3_bucket,
Some(&app_state.s3_credentials),
&s3_name(&manifest.name, &version_id, true),
)
.sign(S3_SIGN_DURATION);
app_state
.reqwest_client
.put(object_url)
.header(CONTENT_TYPE, "text/plain")
.header(CONTENT_ENCODING, "gzip")
.body(readme)
.send()
})),
)
.await;
.storage
.store_readme(&manifest.name, &version_id, readme)
.await
} else {
Ok(())
}
}
);
a?;
b.into_iter().collect::<Result<(), _>>()?;
c?;
Ok(HttpResponse::Ok().body(format!(
"published {}@{} {}",

View file

@ -6,7 +6,6 @@ use actix_web::{
web, App, HttpServer,
};
use log::info;
use rusty_s3::{Bucket, Credentials, UrlStyle};
use std::{env::current_dir, fs::create_dir_all, path::PathBuf, sync::Mutex};
use pesde::{
@ -14,21 +13,35 @@ use pesde::{
AuthConfig, Project,
};
use crate::{auth::UserIdExtractor, search::make_search};
use crate::{
auth::{get_auth_from_env, Auth, UserIdExtractor},
search::make_search,
storage::{get_storage_from_env, Storage},
};
mod auth;
mod endpoints;
mod error;
mod package;
mod search;
mod storage;
pub fn make_reqwest() -> reqwest::Client {
reqwest::ClientBuilder::new()
.user_agent(concat!(
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION")
))
.build()
.unwrap()
}
pub struct AppState {
pub s3_bucket: Bucket,
pub s3_credentials: Credentials,
pub source: Mutex<PesdePackageSource>,
pub project: Project,
pub reqwest_client: reqwest::Client,
pub storage: Storage,
pub auth: Auth,
pub search_reader: tantivy::IndexReader,
pub search_writer: Mutex<tantivy::IndexWriter>,
@ -91,28 +104,18 @@ async fn run(with_sentry: bool) -> std::io::Result<()> {
let (search_reader, search_writer) = make_search(&project, &source);
let app_data = web::Data::new(AppState {
s3_bucket: Bucket::new(
benv!(parse required "S3_ENDPOINT"),
UrlStyle::Path,
benv!(required "S3_BUCKET_NAME"),
benv!(required "S3_REGION"),
)
.unwrap(),
s3_credentials: Credentials::new(
benv!(required "S3_ACCESS_KEY"),
benv!(required "S3_SECRET_KEY"),
),
source: Mutex::new(source),
project,
reqwest_client: reqwest::ClientBuilder::new()
.user_agent(concat!(
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION")
))
.build()
.unwrap(),
storage: {
let storage = get_storage_from_env();
info!("storage: {storage}");
storage
},
auth: {
let auth = get_auth_from_env();
info!("auth: {auth}");
auth
},
search_reader,
search_writer: Mutex::new(search_writer),
@ -144,21 +147,30 @@ async fn run(with_sentry: bool) -> std::io::Result<()> {
)
.service(
web::scope("/v0")
.route("/search", web::get().to(endpoints::search::search_packages))
.route(
"/search",
web::get()
.to(endpoints::search::search_packages)
.wrap(from_fn(auth::read_mw)),
)
.route(
"/packages/{name}",
web::get().to(endpoints::package_versions::get_package_versions),
web::get()
.to(endpoints::package_versions::get_package_versions)
.wrap(from_fn(auth::read_mw)),
)
.route(
"/packages/{name}/{version}/{target}",
web::get().to(endpoints::package_version::get_package_version),
web::get()
.to(endpoints::package_version::get_package_version)
.wrap(from_fn(auth::read_mw)),
)
.route(
"/packages",
web::post()
.to(endpoints::publish_version::publish_package)
.wrap(Governor::new(&publish_governor_config))
.wrap(from_fn(auth::authentication)),
.wrap(from_fn(auth::write_mw)),
),
)
})

View file

@ -1,26 +1,7 @@
use chrono::{DateTime, Utc};
use pesde::{
manifest::target::{Target, TargetKind},
names::PackageName,
source::version_id::VersionId,
};
use pesde::manifest::target::{Target, TargetKind};
use serde::Serialize;
use std::{collections::BTreeSet, time::Duration};
pub const S3_SIGN_DURATION: Duration = Duration::from_secs(60 * 3);
pub fn s3_name(name: &PackageName, version_id: &VersionId, is_readme: bool) -> String {
format!(
"{name}/{}/{}/{}.gz",
version_id.version(),
version_id.target(),
if is_readme { "readme" } else { "pkg.tar" },
)
}
pub fn s3_doc_name(doc_hash: &str) -> String {
format!("doc/{}.gz", doc_hash)
}
use std::collections::BTreeSet;
#[derive(Debug, Serialize, Eq, PartialEq)]
pub struct TargetInfo {

126
registry/src/storage/fs.rs Normal file
View file

@ -0,0 +1,126 @@
use crate::{error::Error, storage::StorageImpl};
use actix_web::{
http::header::{CONTENT_ENCODING, CONTENT_TYPE},
HttpResponse,
};
use pesde::{names::PackageName, source::version_id::VersionId};
use std::{fmt::Display, fs::create_dir_all, path::PathBuf};
#[derive(Debug)]
pub struct FSStorage {
pub root: PathBuf,
}
impl StorageImpl for FSStorage {
async fn store_package(
&self,
package_name: &PackageName,
version: &VersionId,
contents: Vec<u8>,
) -> Result<(), Error> {
let (scope, name) = package_name.as_str();
let path = self
.root
.join(scope)
.join(name)
.join(version.version().to_string())
.join(version.target().to_string());
create_dir_all(&path)?;
std::fs::write(path.join("pkg.tar.gz"), &contents)?;
Ok(())
}
async fn get_package(
&self,
package_name: &PackageName,
version: &VersionId,
) -> Result<HttpResponse, Error> {
let (scope, name) = package_name.as_str();
let path = self
.root
.join(scope)
.join(name)
.join(version.version().to_string())
.join(version.target().to_string());
let contents = std::fs::read(path.join("pkg.tar.gz"))?;
Ok(HttpResponse::Ok()
.append_header((CONTENT_TYPE, "application/gzip"))
.append_header((CONTENT_ENCODING, "gzip"))
.body(contents))
}
async fn store_readme(
&self,
package_name: &PackageName,
version: &VersionId,
contents: Vec<u8>,
) -> Result<(), Error> {
let (scope, name) = package_name.as_str();
let path = self
.root
.join(scope)
.join(name)
.join(version.version().to_string())
.join(version.target().to_string());
create_dir_all(&path)?;
std::fs::write(path.join("readme.gz"), &contents)?;
Ok(())
}
async fn get_readme(
&self,
package_name: &PackageName,
version: &VersionId,
) -> Result<HttpResponse, Error> {
let (scope, name) = package_name.as_str();
let path = self
.root
.join(scope)
.join(name)
.join(version.version().to_string())
.join(version.target().to_string());
let contents = std::fs::read(path.join("readme.gz"))?;
Ok(HttpResponse::Ok()
.append_header((CONTENT_TYPE, "text/plain"))
.append_header((CONTENT_ENCODING, "gzip"))
.body(contents))
}
async fn store_doc(&self, doc_hash: String, contents: Vec<u8>) -> Result<(), Error> {
let path = self.root.join("docs");
create_dir_all(&path)?;
std::fs::write(path.join(format!("{doc_hash}.gz")), &contents)?;
Ok(())
}
async fn get_doc(&self, doc_hash: &str) -> Result<HttpResponse, Error> {
let path = self.root.join("docs");
let contents = std::fs::read(path.join(format!("{doc_hash}.gz")))?;
Ok(HttpResponse::Ok()
.append_header((CONTENT_TYPE, "text/plain"))
.append_header((CONTENT_ENCODING, "gzip"))
.body(contents))
}
}
impl Display for FSStorage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "FS")
}
}

141
registry/src/storage/mod.rs Normal file
View file

@ -0,0 +1,141 @@
use crate::{benv, error::Error, make_reqwest};
use actix_web::HttpResponse;
use pesde::{names::PackageName, source::version_id::VersionId};
use rusty_s3::{Bucket, Credentials, UrlStyle};
use std::fmt::Display;
mod fs;
mod s3;
#[derive(Debug)]
pub enum Storage {
S3(s3::S3Storage),
FS(fs::FSStorage),
}
pub trait StorageImpl: Display {
async fn store_package(
&self,
package_name: &PackageName,
version: &VersionId,
contents: Vec<u8>,
) -> Result<(), crate::error::Error>;
async fn get_package(
&self,
package_name: &PackageName,
version: &VersionId,
) -> Result<HttpResponse, crate::error::Error>;
async fn store_readme(
&self,
package_name: &PackageName,
version: &VersionId,
contents: Vec<u8>,
) -> Result<(), crate::error::Error>;
async fn get_readme(
&self,
package_name: &PackageName,
version: &VersionId,
) -> Result<HttpResponse, crate::error::Error>;
async fn store_doc(
&self,
doc_hash: String,
contents: Vec<u8>,
) -> Result<(), crate::error::Error>;
async fn get_doc(&self, doc_hash: &str) -> Result<HttpResponse, crate::error::Error>;
}
impl StorageImpl for Storage {
async fn store_package(
&self,
package_name: &PackageName,
version: &VersionId,
contents: Vec<u8>,
) -> Result<(), Error> {
match self {
Storage::S3(s3) => s3.store_package(package_name, version, contents).await,
Storage::FS(fs) => fs.store_package(package_name, version, contents).await,
}
}
async fn get_package(
&self,
package_name: &PackageName,
version: &VersionId,
) -> Result<HttpResponse, Error> {
match self {
Storage::S3(s3) => s3.get_package(package_name, version).await,
Storage::FS(fs) => fs.get_package(package_name, version).await,
}
}
async fn store_readme(
&self,
package_name: &PackageName,
version: &VersionId,
contents: Vec<u8>,
) -> Result<(), Error> {
match self {
Storage::S3(s3) => s3.store_readme(package_name, version, contents).await,
Storage::FS(fs) => fs.store_readme(package_name, version, contents).await,
}
}
async fn get_readme(
&self,
package_name: &PackageName,
version: &VersionId,
) -> Result<HttpResponse, Error> {
match self {
Storage::S3(s3) => s3.get_readme(package_name, version).await,
Storage::FS(fs) => fs.get_readme(package_name, version).await,
}
}
async fn store_doc(&self, doc_hash: String, contents: Vec<u8>) -> Result<(), Error> {
match self {
Storage::S3(s3) => s3.store_doc(doc_hash, contents).await,
Storage::FS(fs) => fs.store_doc(doc_hash, contents).await,
}
}
async fn get_doc(&self, doc_hash: &str) -> Result<HttpResponse, Error> {
match self {
Storage::S3(s3) => s3.get_doc(doc_hash).await,
Storage::FS(fs) => fs.get_doc(doc_hash).await,
}
}
}
impl Display for Storage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Storage::S3(s3) => write!(f, "{}", s3),
Storage::FS(fs) => write!(f, "{}", fs),
}
}
}
pub fn get_storage_from_env() -> Storage {
if let Ok(endpoint) = benv!(parse "S3_ENDPOINT") {
Storage::S3(s3::S3Storage {
s3_bucket: Bucket::new(
endpoint,
UrlStyle::Path,
benv!(required "S3_BUCKET_NAME"),
benv!(required "S3_REGION"),
)
.unwrap(),
s3_credentials: Credentials::new(
benv!(required "S3_ACCESS_KEY"),
benv!(required "S3_SECRET_KEY"),
),
reqwest_client: make_reqwest(),
})
} else if let Ok(root) = benv!(parse "FS_STORAGE_ROOT") {
Storage::FS(fs::FSStorage { root })
} else {
panic!("no storage backend configured")
}
}

153
registry/src/storage/s3.rs Normal file
View file

@ -0,0 +1,153 @@
use crate::{error::Error, storage::StorageImpl};
use actix_web::{http::header::LOCATION, HttpResponse};
use pesde::{names::PackageName, source::version_id::VersionId};
use reqwest::header::{CONTENT_ENCODING, CONTENT_TYPE};
use rusty_s3::{actions::PutObject, Bucket, Credentials, S3Action};
use std::{fmt::Display, time::Duration};
#[derive(Debug)]
pub struct S3Storage {
pub s3_bucket: Bucket,
pub s3_credentials: Credentials,
pub reqwest_client: reqwest::Client,
}
pub const S3_SIGN_DURATION: Duration = Duration::from_secs(60 * 3);
impl StorageImpl for S3Storage {
async fn store_package(
&self,
package_name: &PackageName,
version: &VersionId,
contents: Vec<u8>,
) -> Result<(), Error> {
let object_url = PutObject::new(
&self.s3_bucket,
Some(&self.s3_credentials),
&format!(
"{package_name}/{}/{}/pkg.tar.gz",
version.version(),
version.target()
),
)
.sign(S3_SIGN_DURATION);
self.reqwest_client
.put(object_url)
.header(CONTENT_TYPE, "application/gzip")
.header(CONTENT_ENCODING, "gzip")
.body(contents)
.send()
.await?;
Ok(())
}
async fn get_package(
&self,
package_name: &PackageName,
version: &VersionId,
) -> Result<HttpResponse, Error> {
let object_url = PutObject::new(
&self.s3_bucket,
Some(&self.s3_credentials),
&format!(
"{package_name}/{}/{}/pkg.tar.gz",
version.version(),
version.target()
),
)
.sign(S3_SIGN_DURATION);
Ok(HttpResponse::TemporaryRedirect()
.append_header((LOCATION, object_url.as_str()))
.finish())
}
async fn store_readme(
&self,
package_name: &PackageName,
version: &VersionId,
contents: Vec<u8>,
) -> Result<(), Error> {
let object_url = PutObject::new(
&self.s3_bucket,
Some(&self.s3_credentials),
&format!(
"{package_name}/{}/{}/readme.gz",
version.version(),
version.target()
),
)
.sign(S3_SIGN_DURATION);
self.reqwest_client
.put(object_url)
.header(CONTENT_TYPE, "text/plain")
.header(CONTENT_ENCODING, "gzip")
.body(contents)
.send()
.await?;
Ok(())
}
async fn get_readme(
&self,
package_name: &PackageName,
version: &VersionId,
) -> Result<HttpResponse, Error> {
let object_url = PutObject::new(
&self.s3_bucket,
Some(&self.s3_credentials),
&format!(
"{package_name}/{}/{}/readme.gz",
version.version(),
version.target()
),
)
.sign(S3_SIGN_DURATION);
Ok(HttpResponse::TemporaryRedirect()
.append_header((LOCATION, object_url.as_str()))
.finish())
}
async fn store_doc(&self, doc_hash: String, contents: Vec<u8>) -> Result<(), Error> {
let object_url = PutObject::new(
&self.s3_bucket,
Some(&self.s3_credentials),
&format!("doc/{}.gz", doc_hash),
)
.sign(S3_SIGN_DURATION);
self.reqwest_client
.put(object_url)
.header(CONTENT_TYPE, "text/plain")
.header(CONTENT_ENCODING, "gzip")
.body(contents)
.send()
.await?;
Ok(())
}
async fn get_doc(&self, doc_hash: &str) -> Result<HttpResponse, Error> {
let object_url = PutObject::new(
&self.s3_bucket,
Some(&self.s3_credentials),
&format!("doc/{}.gz", doc_hash),
)
.sign(S3_SIGN_DURATION);
Ok(HttpResponse::TemporaryRedirect()
.append_header((LOCATION, object_url.as_str()))
.finish())
}
}
impl Display for S3Storage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "S3")
}
}