Figure out how to solve background tasks in new scheduler

This commit is contained in:
Filip Tibell 2023-08-20 21:53:21 -05:00
parent 70caf89295
commit 3f5371a7c1
4 changed files with 72 additions and 13 deletions

View file

@ -168,7 +168,9 @@ where
Ok(bound) => bound, Ok(bound) => bound,
}; };
// Start up our web server // Start up our web server
sched.schedule_future_background(async move { // TODO: Spawn a scheduler background task here,
// and communicate using an mpsc channel instead
sched.schedule_future_background_local(async move {
bound bound
.http1_only(true) // Web sockets can only use http1 .http1_only(true) // Web sockets can only use http1
.http1_keepalive(true) // Web sockets must be kept alive .http1_keepalive(true) // Web sockets must be kept alive

View file

@ -53,7 +53,7 @@ impl Service<Request<Body>> for NetServiceInner {
let sched = lua let sched = lua
.app_data_ref::<&Scheduler>() .app_data_ref::<&Scheduler>()
.expect("Lua struct is missing scheduler"); .expect("Lua struct is missing scheduler");
sched.schedule_future_background(async move { sched.schedule_future_background_local(async move {
// Create our new full websocket object, then // Create our new full websocket object, then
// schedule our handler to get called asap // schedule our handler to get called asap
let res = async move { let res = async move {

View file

@ -7,7 +7,6 @@ use std::{
use dunce::canonicalize; use dunce::canonicalize;
use mlua::prelude::*; use mlua::prelude::*;
use os_str_bytes::RawOsString; use os_str_bytes::RawOsString;
use tokio::task;
use crate::lune::{scheduler::Scheduler, util::TableBuilder}; use crate::lune::{scheduler::Scheduler, util::TableBuilder};
@ -164,12 +163,24 @@ async fn process_spawn(
lua: &Lua, lua: &Lua,
(program, args, options): (String, Option<Vec<String>>, ProcessSpawnOptions), (program, args, options): (String, Option<Vec<String>>, ProcessSpawnOptions),
) -> LuaResult<LuaTable> { ) -> LuaResult<LuaTable> {
// Spawn the new process in the background, /*
// letting the tokio runtime place it on a Spawn the new process in the background, letting the tokio
// different thread if necessary, and wait runtime place it on a different thread if possible / necessary
let (status, stdout, stderr) = task::spawn(spawn_command(program, args, options))
Note that we have to use our scheduler here, we can't
use anything like tokio::task::spawn because our lua
scheduler will not drive those futures to completion
*/
let sched = lua
.app_data_ref::<&Scheduler>()
.expect("Lua struct is missing scheduler");
let fut = spawn_command(program, args, options);
let recv = sched.schedule_future_background(fut);
let (status, stdout, stderr) = recv
.await .await
.expect("Spawned process should not be cancellable")?; .expect("Failed to receive result of spawned process")?;
// NOTE: If an exit code was not given by the child process, // NOTE: If an exit code was not given by the child process,
// we default to 1 if it yielded any error output, otherwise 0 // we default to 1 if it yielded any error output, otherwise 0

View file

@ -1,5 +1,9 @@
use futures_util::Future; use futures_util::Future;
use mlua::prelude::*; use mlua::prelude::*;
use tokio::{
sync::oneshot::{self, Receiver},
task,
};
use super::{IntoLuaThread, Scheduler}; use super::{IntoLuaThread, Scheduler};
@ -29,18 +33,60 @@ where
/** /**
Schedules a plain future to run in the background. Schedules a plain future to run in the background.
Note that this will keep the scheduler alive even This will spawn the future both on the scheduler and
if the future does not spawn any new lua threads. potentially on a different thread using [`task::spawn`],
meaning the provided future must implement [`Send`].
Returns a [`Receiver`] which may be `await`-ed
to retrieve the result of the spawned future.
This [`Receiver`] may be safely ignored if the result of the
spawned future is not needed, the future will run either way.
*/ */
pub fn schedule_future_background<F>(&self, fut: F) pub fn schedule_future_background<F, FR>(&self, fut: F) -> Receiver<FR>
where where
F: Future<Output = ()> + 'static, F: Future<Output = FR> + Send + 'static,
FR: Send + 'static,
{ {
let (tx, rx) = oneshot::channel();
let handle = task::spawn(async move {
let res = fut.await;
tx.send(res).ok();
});
let futs = self let futs = self
.futures_background .futures_background
.try_lock() .try_lock()
.expect("Failed to lock futures queue for background tasks"); .expect("Failed to lock futures queue for background tasks");
futs.push(Box::pin(fut)) futs.push(Box::pin(async move {
handle.await.ok();
}));
rx
}
/**
Equivalent to [`schedule_future_background`], except the
future is only spawned on the scheduler, on the main thread.
*/
pub fn schedule_future_background_local<F, FR>(&self, fut: F) -> Receiver<FR>
where
F: Future<Output = FR> + 'static,
FR: 'static,
{
let (tx, rx) = oneshot::channel();
let futs = self
.futures_background
.try_lock()
.expect("Failed to lock futures queue for background tasks");
futs.push(Box::pin(async move {
let res = fut.await;
tx.send(res).ok();
}));
rx
} }
/** /**