From 30c93994b50888aaeb32c65c90426e997800ccea Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Bla=C5=BE=20Hrastnik?= <blaz@mxxn.io>
Date: Fri, 14 Oct 2022 16:22:21 +0900
Subject: [PATCH] Use a single save_queue on the editor

---
 helix-term/src/application.rs    |  20 ++++-
 helix-term/src/commands.rs       |   3 +-
 helix-term/src/commands/typed.rs |  86 +++++++++++--------
 helix-term/src/compositor.rs     |  22 +++--
 helix-view/src/document.rs       | 140 +++++--------------------------
 helix-view/src/editor.rs         |  79 ++++++++++-------
 6 files changed, 159 insertions(+), 191 deletions(-)

diff --git a/helix-term/src/application.rs b/helix-term/src/application.rs
index 2e49e6d1..6010e745 100644
--- a/helix-term/src/application.rs
+++ b/helix-term/src/application.rs
@@ -1,5 +1,5 @@
 use arc_swap::{access::Map, ArcSwap};
-use futures_util::Stream;
+use futures_util::{Stream, StreamExt};
 use helix_core::{
     diagnostic::{DiagnosticTag, NumberOrString},
     path::get_relative_path,
@@ -968,6 +968,24 @@ impl Application {
         //        errors along the way
         let mut errs = Vec::new();
 
+        // TODO: deduplicate with ctx.block_try_flush_writes
+        tokio::task::block_in_place(|| {
+            helix_lsp::block_on(async {
+                while let Some(save_event) = self.editor.save_queue.next().await {
+                    match &save_event {
+                        Ok(event) => {
+                            let doc = doc_mut!(self.editor, &event.doc_id);
+                            doc.set_last_saved_revision(event.revision);
+                        }
+                        Err(err) => {
+                            log::error!("error saving document: {}", err);
+                        }
+                    };
+                    // TODO: if is_err: break?
+                }
+            })
+        });
+
         if let Err(err) = self
             .jobs
             .finish(&mut self.editor, Some(&mut self.compositor))
diff --git a/helix-term/src/commands.rs b/helix-term/src/commands.rs
index f6d583f5..87bbd6c6 100644
--- a/helix-term/src/commands.rs
+++ b/helix-term/src/commands.rs
@@ -2541,7 +2541,8 @@ async fn make_format_callback(
         }
 
         if let Some((path, force)) = write {
-            if let Err(err) = doc.save(path, force) {
+            let id = doc.id();
+            if let Err(err) = editor.save(id, path, force) {
                 editor.set_error(format!("Error saving: {}", err));
             }
         }
diff --git a/helix-term/src/commands/typed.rs b/helix-term/src/commands/typed.rs
index 070215cb..ef774256 100644
--- a/helix-term/src/commands/typed.rs
+++ b/helix-term/src/commands/typed.rs
@@ -79,12 +79,28 @@ fn buffer_close_by_ids_impl(
     doc_ids: &[DocumentId],
     force: bool,
 ) -> anyhow::Result<()> {
+    // TODO: deduplicate with ctx.block_try_flush_writes
+    tokio::task::block_in_place(|| {
+        helix_lsp::block_on(async {
+            while let Some(save_event) = editor.save_queue.next().await {
+                match &save_event {
+                    Ok(event) => {
+                        let doc = doc_mut!(editor, &event.doc_id);
+                        doc.set_last_saved_revision(event.revision);
+                    }
+                    Err(err) => {
+                        log::error!("error saving document: {}", err);
+                    }
+                };
+                // TODO: if is_err: break?
+            }
+        })
+    });
+
     let (modified_ids, modified_names): (Vec<_>, Vec<_>) = doc_ids
         .iter()
         .filter_map(|&doc_id| {
-            if let Err(CloseError::BufferModified(name)) = tokio::task::block_in_place(|| {
-                helix_lsp::block_on(editor.close_document(doc_id, force))
-            }) {
+            if let Err(CloseError::BufferModified(name)) = editor.close_document(doc_id, force) {
                 Some((doc_id, name))
             } else {
                 None
@@ -289,7 +305,8 @@ fn write_impl(
     };
 
     if fmt.is_none() {
-        doc.save(path, force)?;
+        let id = doc.id();
+        cx.editor.save(id, path, force)?;
     }
 
     Ok(())
@@ -569,40 +586,45 @@ fn write_all_impl(
         return Ok(());
     }
 
-    let mut errors: Option<String> = None;
+    let mut errors: Vec<&'static str> = Vec::new();
     let auto_format = cx.editor.config().auto_format;
     let jobs = &mut cx.jobs;
 
     // save all documents
-    for doc in &mut cx.editor.documents.values_mut() {
-        if doc.path().is_none() {
-            errors = errors
-                .or_else(|| Some(String::new()))
-                .map(|mut errs: String| {
-                    errs.push_str("cannot write a buffer without a filename\n");
-                    errs
-                });
+    let saves: Vec<_> = cx
+        .editor
+        .documents
+        .values()
+        .filter_map(|doc| {
+            if doc.path().is_none() {
+                errors.push("cannot write a buffer without a filename\n");
+                return None;
+            }
 
-            continue;
-        }
+            if !doc.is_modified() {
+                return None;
+            }
 
-        if !doc.is_modified() {
-            continue;
-        }
+            let fmt = if auto_format {
+                doc.auto_format().map(|fmt| {
+                    let callback =
+                        make_format_callback(doc.id(), doc.version(), fmt, Some((None, force)));
+                    jobs.add(Job::with_callback(callback).wait_before_exiting());
+                })
+            } else {
+                None
+            };
 
-        let fmt = if auto_format {
-            doc.auto_format().map(|fmt| {
-                let callback =
-                    make_format_callback(doc.id(), doc.version(), fmt, Some((None, force)));
-                jobs.add(Job::with_callback(callback).wait_before_exiting());
-            })
-        } else {
+            if fmt.is_none() {
+                return Some(doc.id());
+            }
             None
-        };
+        })
+        .collect();
 
-        if fmt.is_none() {
-            doc.save::<PathBuf>(None, force)?;
-        }
+    // manually call save for the rest of docs that don't have a formatter
+    for id in saves {
+        cx.editor.save::<PathBuf>(id, None, force)?;
     }
 
     if quit {
@@ -619,10 +641,8 @@ fn write_all_impl(
         }
     }
 
-    if let Some(errs) = errors {
-        if !force {
-            bail!(errs);
-        }
+    if !errors.is_empty() && !force {
+        bail!("{:?}", errors);
     }
 
     Ok(())
diff --git a/helix-term/src/compositor.rs b/helix-term/src/compositor.rs
index 35b9d054..a4ffaff2 100644
--- a/helix-term/src/compositor.rs
+++ b/helix-term/src/compositor.rs
@@ -1,3 +1,4 @@
+use futures_util::StreamExt;
 // Each component declares it's own size constraints and gets fitted based on it's parent.
 // Q: how does this work with popups?
 // cursive does compositor.screen_mut().add_layer_at(pos::absolute(x, y), <component>)
@@ -33,11 +34,22 @@ impl<'a> Context<'a> {
     pub fn block_try_flush_writes(&mut self) -> anyhow::Result<()> {
         tokio::task::block_in_place(|| helix_lsp::block_on(self.jobs.finish(self.editor, None)))?;
 
-        for doc in &mut self.editor.documents.values_mut() {
-            tokio::task::block_in_place(|| helix_lsp::block_on(doc.try_flush_saves()))
-                .map(|result| result.map(|_| ()))
-                .unwrap_or(Ok(()))?;
-        }
+        tokio::task::block_in_place(|| {
+            helix_lsp::block_on(async {
+                while let Some(save_event) = self.editor.save_queue.next().await {
+                    match &save_event {
+                        Ok(event) => {
+                            let doc = doc_mut!(self.editor, &event.doc_id);
+                            doc.set_last_saved_revision(event.revision);
+                        }
+                        Err(err) => {
+                            log::error!("error saving document: {}", err);
+                        }
+                    };
+                    // TODO: if is_err: break?
+                }
+            })
+        });
 
         Ok(())
     }
diff --git a/helix-view/src/document.rs b/helix-view/src/document.rs
index 0774e516..9fa1241e 100644
--- a/helix-view/src/document.rs
+++ b/helix-view/src/document.rs
@@ -13,10 +13,6 @@ use std::future::Future;
 use std::path::{Path, PathBuf};
 use std::str::FromStr;
 use std::sync::Arc;
-use tokio::sync::mpsc::error::TryRecvError;
-use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
-
-use tokio::sync::Mutex;
 
 use helix_core::{
     encoding,
@@ -134,9 +130,6 @@ pub struct Document {
     last_saved_revision: usize,
     version: i32, // should be usize?
     pub(crate) modified_since_accessed: bool,
-    save_sender: Option<UnboundedSender<DocumentSavedEventFuture>>,
-    save_receiver: Option<UnboundedReceiver<DocumentSavedEventFuture>>,
-    current_save: Arc<Mutex<Option<DocumentSavedEventFuture>>>,
 
     diagnostics: Vec<Diagnostic>,
     language_server: Option<Arc<helix_lsp::Client>>,
@@ -357,7 +350,6 @@ impl Document {
         let encoding = encoding.unwrap_or(encoding::UTF_8);
         let changes = ChangeSet::new(&text);
         let old_state = None;
-        let (save_sender, save_receiver) = tokio::sync::mpsc::unbounded_channel();
 
         Self {
             id: DocumentId::default(),
@@ -378,9 +370,6 @@ impl Document {
             savepoint: None,
             last_saved_revision: 0,
             modified_since_accessed: false,
-            save_sender: Some(save_sender),
-            save_receiver: Some(save_receiver),
-            current_save: Arc::new(Mutex::new(None)),
             language_server: None,
         }
     }
@@ -519,21 +508,26 @@ impl Document {
         &mut self,
         path: Option<P>,
         force: bool,
-    ) -> Result<(), anyhow::Error> {
-        self.save_impl::<futures_util::future::Ready<_>, _>(path, force)
+    ) -> Result<
+        impl Future<Output = Result<DocumentSavedEvent, anyhow::Error>> + 'static + Send,
+        anyhow::Error,
+    > {
+        let path = path.map(|path| path.into());
+        self.save_impl(path, force)
+
+        // futures_util::future::Ready<_>,
     }
 
     /// The `Document`'s text is encoded according to its encoding and written to the file located
     /// at its `path()`.
-    fn save_impl<F, P>(&mut self, path: Option<P>, force: bool) -> Result<(), anyhow::Error>
-    where
-        F: Future<Output = Result<Transaction, FormatterError>> + 'static + Send,
-        P: Into<PathBuf>,
-    {
-        if self.save_sender.is_none() {
-            bail!("saves are closed for this document!");
-        }
-
+    fn save_impl(
+        &mut self,
+        path: Option<PathBuf>,
+        force: bool,
+    ) -> Result<
+        impl Future<Output = Result<DocumentSavedEvent, anyhow::Error>> + 'static + Send,
+        anyhow::Error,
+    > {
         log::debug!(
             "submitting save of doc '{:?}'",
             self.path().map(|path| path.to_string_lossy())
@@ -544,7 +538,7 @@ impl Document {
         let text = self.text().clone();
 
         let path = match path {
-            Some(path) => helix_core::path::get_canonicalized_path(&path.into())?,
+            Some(path) => helix_core::path::get_canonicalized_path(&path)?,
             None => {
                 if self.path.is_none() {
                     bail!("Can't save with no path set!");
@@ -564,7 +558,7 @@ impl Document {
         let encoding = self.encoding;
 
         // We encode the file according to the `Document`'s encoding.
-        let save_event = async move {
+        let future = async move {
             use tokio::fs::File;
             if let Some(parent) = path.parent() {
                 // TODO: display a prompt asking the user if the directories should be created
@@ -604,107 +598,15 @@ impl Document {
             Ok(event)
         };
 
-        self.save_sender
-            .as_mut()
-            .unwrap()
-            .send(Box::pin(save_event))
-            .map_err(|err| anyhow!("failed to send save event: {}", err))
-    }
-
-    pub async fn await_save(&mut self) -> Option<DocumentSavedEventResult> {
-        self.await_save_impl(true).await
-    }
-
-    async fn await_save_impl(&mut self, block: bool) -> Option<DocumentSavedEventResult> {
-        let mut current_save = self.current_save.lock().await;
-        if let Some(ref mut save) = *current_save {
-            log::trace!("reawaiting save of '{:?}'", self.path());
-            let result = save.await;
-            *current_save = None;
-            log::trace!("reawait save of '{:?}' result: {:?}", self.path(), result);
-            return Some(result);
-        }
-
-        // return early if the receiver is closed
-        let rx = self.save_receiver.as_mut()?;
-
-        let save_req = if block {
-            rx.recv().await
-        } else {
-            let msg = rx.try_recv();
-
-            if let Err(err) = msg {
-                match err {
-                    TryRecvError::Empty => return None,
-                    TryRecvError::Disconnected => None,
-                }
-            } else {
-                msg.ok()
-            }
-        };
-
-        let save = match save_req {
-            Some(save) => save,
-            None => {
-                self.save_receiver = None;
-                return None;
-            }
-        };
-
-        // save a handle to the future so that when a poll on this
-        // function gets cancelled, we don't lose it
-        *current_save = Some(save);
-        log::trace!("awaiting save of '{:?}'", self.path());
-
-        let result = (*current_save).as_mut().unwrap().await;
-        *current_save = None;
-
-        log::trace!("save of '{:?}' result: {:?}", self.path(), result);
-
-        Some(result)
-    }
-
-    /// Flushes the queue of pending writes. If any fail,
-    /// it stops early before emptying the rest of the queue.
-    pub async fn try_flush_saves(&mut self) -> Option<DocumentSavedEventResult> {
-        self.flush_saves_impl(false).await
-    }
-
-    async fn flush_saves_impl(&mut self, block: bool) -> Option<DocumentSavedEventResult> {
-        let mut final_result = None;
-
-        while let Some(save_event) = self.await_save_impl(block).await {
-            let is_err = match &save_event {
-                Ok(event) => {
-                    self.set_last_saved_revision(event.revision);
-                    false
-                }
-                Err(err) => {
-                    log::error!(
-                        "error saving document {:?}: {}",
-                        self.path().map(|path| path.to_string_lossy()),
-                        err
-                    );
-                    true
-                }
-            };
-
-            final_result = Some(save_event);
-
-            if is_err {
-                break;
-            }
-        }
-
-        final_result
+        Ok(future)
     }
 
     /// Prepares the Document for being closed by stopping any new writes
     /// and flushing through the queue of pending writes. If any fail,
     /// it stops early before emptying the rest of the queue.
     pub async fn close(&mut self) -> Option<DocumentSavedEventResult> {
-        self.save_sender.take();
-        self.flush_saves_impl(true).await
+        // TODO
+        None
     }
 
     /// Detect the programming language based on the file type.
diff --git a/helix-view/src/editor.rs b/helix-view/src/editor.rs
index fbd0b2b0..c4789ee2 100644
--- a/helix-view/src/editor.rs
+++ b/helix-view/src/editor.rs
@@ -1,6 +1,6 @@
 use crate::{
     clipboard::{get_clipboard_provider, ClipboardProvider},
-    document::{DocumentSavedEventResult, Mode},
+    document::{DocumentSavedEventFuture, DocumentSavedEventResult, Mode},
     graphics::{CursorKind, Rect},
     info::Info,
     input::KeyEvent,
@@ -9,7 +9,7 @@ use crate::{
     Document, DocumentId, View, ViewId,
 };
 
-use futures_util::stream::{select_all::SelectAll, FuturesUnordered};
+use futures_util::stream::select_all::SelectAll;
 use futures_util::{future, StreamExt};
 use helix_lsp::Call;
 use tokio_stream::wrappers::UnboundedReceiverStream;
@@ -29,7 +29,7 @@ use tokio::{
     time::{sleep, Duration, Instant, Sleep},
 };
 
-use anyhow::Error;
+use anyhow::{anyhow, Error};
 
 pub use helix_core::diagnostic::Severity;
 pub use helix_core::register::Registers;
@@ -644,12 +644,20 @@ pub struct Breakpoint {
     pub log_message: Option<String>,
 }
 
+use futures_util::stream::{Flatten, Once};
+
 pub struct Editor {
     /// Current editing mode.
     pub mode: Mode,
     pub tree: Tree,
     pub next_document_id: DocumentId,
     pub documents: BTreeMap<DocumentId, Document>,
+
+    // We Flatten<> to resolve the inner DocumentSavedEventFuture. For that we need a stream of streams, hence the Once<>.
+    // https://stackoverflow.com/a/66875668
+    pub saves: HashMap<DocumentId, UnboundedSender<Once<DocumentSavedEventFuture>>>,
+    pub save_queue: SelectAll<Flatten<UnboundedReceiverStream<Once<DocumentSavedEventFuture>>>>,
+
     pub count: Option<std::num::NonZeroUsize>,
     pub selected_register: Option<char>,
     pub registers: Registers,
@@ -751,6 +759,8 @@ impl Editor {
             tree: Tree::new(area),
             next_document_id: DocumentId::default(),
             documents: BTreeMap::new(),
+            saves: HashMap::new(),
+            save_queue: SelectAll::new(),
             count: None,
             selected_register: None,
             macro_recording: None,
@@ -1083,6 +1093,12 @@ impl Editor {
             self.new_document(doc)
         };
 
+        let (save_sender, save_receiver) = tokio::sync::mpsc::unbounded_channel();
+        self.saves.insert(id, save_sender);
+
+        let stream = UnboundedReceiverStream::new(save_receiver).flatten();
+        self.save_queue.push(stream);
+
         self.switch(id, action);
         Ok(id)
     }
@@ -1095,38 +1111,21 @@ impl Editor {
         self._refresh();
     }
 
-    pub async fn close_document(
-        &mut self,
-        doc_id: DocumentId,
-        force: bool,
-    ) -> Result<(), CloseError> {
+    pub fn close_document(&mut self, doc_id: DocumentId, force: bool) -> Result<(), CloseError> {
         let doc = match self.documents.get_mut(&doc_id) {
             Some(doc) => doc,
             None => return Err(CloseError::DoesNotExist),
         };
-
-        // flush out any pending writes first to clear the modified status
-        if let Some(Err(err)) = doc.try_flush_saves().await {
-            return Err(CloseError::SaveError(err));
-        }
-
         if !force && doc.is_modified() {
             return Err(CloseError::BufferModified(doc.display_name().into_owned()));
         }
 
-        if let Some(Err(err)) = doc.close().await {
-            return Err(CloseError::SaveError(err));
-        }
+        // This will also disallow any follow-up writes
+        self.saves.remove(&doc_id);
 
-        // Don't fail the whole write because the language server could not
-        // acknowledge the close
         if let Some(language_server) = doc.language_server() {
-            if let Err(err) = language_server
-                .text_document_did_close(doc.identifier())
-                .await
-            {
-                log::error!("Error closing doc in language server: {}", err);
-            }
+            // TODO: track error
+            tokio::spawn(language_server.text_document_did_close(doc.identifier()));
         }
 
         enum Action {
@@ -1188,6 +1187,28 @@ impl Editor {
         Ok(())
     }
 
+    pub fn save<P: Into<PathBuf>>(
+        &mut self,
+        doc_id: DocumentId,
+        path: Option<P>,
+        force: bool,
+    ) -> anyhow::Result<()> {
+        // convert a channel of futures to pipe into main queue one by one
+        // via stream.then() ? then push into main future
+
+        let path = path.map(|path| path.into());
+        let doc = doc_mut!(self, &doc_id);
+        let future = doc.save(path, force)?;
+        // TODO: if no self.saves for that doc id then bail
+        // bail!("saves are closed for this document!");
+        use futures_util::stream;
+        self.saves[&doc_id]
+            .send(stream::once(Box::pin(future)))
+            .map_err(|err| anyhow!("failed to send save event: {}", err))?;
+
+        Ok(())
+    }
+
     pub fn resize(&mut self, area: Rect) {
         if self.tree.resize(area) {
             self._refresh();
@@ -1307,16 +1328,10 @@ impl Editor {
     }
 
     pub async fn wait_event(&mut self) -> EditorEvent {
-        let mut saves: FuturesUnordered<_> = self
-            .documents
-            .values_mut()
-            .map(Document::await_save)
-            .collect();
-
         tokio::select! {
             biased;
 
-            Some(Some(event)) = saves.next() => {
+            Some(event) = self.save_queue.next() => {
                 EditorEvent::DocumentSaved(event)
             }
             Some(config_event) = self.config_events.1.recv() => {