Use a single save_queue on the editor
This commit is contained in:
parent
beb3427bfb
commit
30c93994b5
6 changed files with 159 additions and 191 deletions
helix-term/src
helix-view/src
|
@ -1,5 +1,5 @@
|
|||
use arc_swap::{access::Map, ArcSwap};
|
||||
use futures_util::Stream;
|
||||
use futures_util::{Stream, StreamExt};
|
||||
use helix_core::{
|
||||
diagnostic::{DiagnosticTag, NumberOrString},
|
||||
path::get_relative_path,
|
||||
|
@ -968,6 +968,24 @@ impl Application {
|
|||
// errors along the way
|
||||
let mut errs = Vec::new();
|
||||
|
||||
// TODO: deduplicate with ctx.block_try_flush_writes
|
||||
tokio::task::block_in_place(|| {
|
||||
helix_lsp::block_on(async {
|
||||
while let Some(save_event) = self.editor.save_queue.next().await {
|
||||
match &save_event {
|
||||
Ok(event) => {
|
||||
let doc = doc_mut!(self.editor, &event.doc_id);
|
||||
doc.set_last_saved_revision(event.revision);
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!("error saving document: {}", err);
|
||||
}
|
||||
};
|
||||
// TODO: if is_err: break?
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
if let Err(err) = self
|
||||
.jobs
|
||||
.finish(&mut self.editor, Some(&mut self.compositor))
|
||||
|
|
|
@ -2541,7 +2541,8 @@ async fn make_format_callback(
|
|||
}
|
||||
|
||||
if let Some((path, force)) = write {
|
||||
if let Err(err) = doc.save(path, force) {
|
||||
let id = doc.id();
|
||||
if let Err(err) = editor.save(id, path, force) {
|
||||
editor.set_error(format!("Error saving: {}", err));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,12 +79,28 @@ fn buffer_close_by_ids_impl(
|
|||
doc_ids: &[DocumentId],
|
||||
force: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
// TODO: deduplicate with ctx.block_try_flush_writes
|
||||
tokio::task::block_in_place(|| {
|
||||
helix_lsp::block_on(async {
|
||||
while let Some(save_event) = editor.save_queue.next().await {
|
||||
match &save_event {
|
||||
Ok(event) => {
|
||||
let doc = doc_mut!(editor, &event.doc_id);
|
||||
doc.set_last_saved_revision(event.revision);
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!("error saving document: {}", err);
|
||||
}
|
||||
};
|
||||
// TODO: if is_err: break?
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
let (modified_ids, modified_names): (Vec<_>, Vec<_>) = doc_ids
|
||||
.iter()
|
||||
.filter_map(|&doc_id| {
|
||||
if let Err(CloseError::BufferModified(name)) = tokio::task::block_in_place(|| {
|
||||
helix_lsp::block_on(editor.close_document(doc_id, force))
|
||||
}) {
|
||||
if let Err(CloseError::BufferModified(name)) = editor.close_document(doc_id, force) {
|
||||
Some((doc_id, name))
|
||||
} else {
|
||||
None
|
||||
|
@ -289,7 +305,8 @@ fn write_impl(
|
|||
};
|
||||
|
||||
if fmt.is_none() {
|
||||
doc.save(path, force)?;
|
||||
let id = doc.id();
|
||||
cx.editor.save(id, path, force)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -569,40 +586,45 @@ fn write_all_impl(
|
|||
return Ok(());
|
||||
}
|
||||
|
||||
let mut errors: Option<String> = None;
|
||||
let mut errors: Vec<&'static str> = Vec::new();
|
||||
let auto_format = cx.editor.config().auto_format;
|
||||
let jobs = &mut cx.jobs;
|
||||
|
||||
// save all documents
|
||||
for doc in &mut cx.editor.documents.values_mut() {
|
||||
if doc.path().is_none() {
|
||||
errors = errors
|
||||
.or_else(|| Some(String::new()))
|
||||
.map(|mut errs: String| {
|
||||
errs.push_str("cannot write a buffer without a filename\n");
|
||||
errs
|
||||
});
|
||||
let saves: Vec<_> = cx
|
||||
.editor
|
||||
.documents
|
||||
.values()
|
||||
.filter_map(|doc| {
|
||||
if doc.path().is_none() {
|
||||
errors.push("cannot write a buffer without a filename\n");
|
||||
return None;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
if !doc.is_modified() {
|
||||
return None;
|
||||
}
|
||||
|
||||
if !doc.is_modified() {
|
||||
continue;
|
||||
}
|
||||
let fmt = if auto_format {
|
||||
doc.auto_format().map(|fmt| {
|
||||
let callback =
|
||||
make_format_callback(doc.id(), doc.version(), fmt, Some((None, force)));
|
||||
jobs.add(Job::with_callback(callback).wait_before_exiting());
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let fmt = if auto_format {
|
||||
doc.auto_format().map(|fmt| {
|
||||
let callback =
|
||||
make_format_callback(doc.id(), doc.version(), fmt, Some((None, force)));
|
||||
jobs.add(Job::with_callback(callback).wait_before_exiting());
|
||||
})
|
||||
} else {
|
||||
if fmt.is_none() {
|
||||
return Some(doc.id());
|
||||
}
|
||||
None
|
||||
};
|
||||
})
|
||||
.collect();
|
||||
|
||||
if fmt.is_none() {
|
||||
doc.save::<PathBuf>(None, force)?;
|
||||
}
|
||||
// manually call save for the rest of docs that don't have a formatter
|
||||
for id in saves {
|
||||
cx.editor.save::<PathBuf>(id, None, force)?;
|
||||
}
|
||||
|
||||
if quit {
|
||||
|
@ -619,10 +641,8 @@ fn write_all_impl(
|
|||
}
|
||||
}
|
||||
|
||||
if let Some(errs) = errors {
|
||||
if !force {
|
||||
bail!(errs);
|
||||
}
|
||||
if !errors.is_empty() && !force {
|
||||
bail!("{:?}", errors);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
use futures_util::StreamExt;
|
||||
// Each component declares it's own size constraints and gets fitted based on it's parent.
|
||||
// Q: how does this work with popups?
|
||||
// cursive does compositor.screen_mut().add_layer_at(pos::absolute(x, y), <component>)
|
||||
|
@ -33,11 +34,22 @@ impl<'a> Context<'a> {
|
|||
pub fn block_try_flush_writes(&mut self) -> anyhow::Result<()> {
|
||||
tokio::task::block_in_place(|| helix_lsp::block_on(self.jobs.finish(self.editor, None)))?;
|
||||
|
||||
for doc in &mut self.editor.documents.values_mut() {
|
||||
tokio::task::block_in_place(|| helix_lsp::block_on(doc.try_flush_saves()))
|
||||
.map(|result| result.map(|_| ()))
|
||||
.unwrap_or(Ok(()))?;
|
||||
}
|
||||
tokio::task::block_in_place(|| {
|
||||
helix_lsp::block_on(async {
|
||||
while let Some(save_event) = self.editor.save_queue.next().await {
|
||||
match &save_event {
|
||||
Ok(event) => {
|
||||
let doc = doc_mut!(self.editor, &event.doc_id);
|
||||
doc.set_last_saved_revision(event.revision);
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!("error saving document: {}", err);
|
||||
}
|
||||
};
|
||||
// TODO: if is_err: break?
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -13,10 +13,6 @@ use std::future::Future;
|
|||
use std::path::{Path, PathBuf};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc::error::TryRecvError;
|
||||
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
|
||||
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use helix_core::{
|
||||
encoding,
|
||||
|
@ -134,9 +130,6 @@ pub struct Document {
|
|||
last_saved_revision: usize,
|
||||
version: i32, // should be usize?
|
||||
pub(crate) modified_since_accessed: bool,
|
||||
save_sender: Option<UnboundedSender<DocumentSavedEventFuture>>,
|
||||
save_receiver: Option<UnboundedReceiver<DocumentSavedEventFuture>>,
|
||||
current_save: Arc<Mutex<Option<DocumentSavedEventFuture>>>,
|
||||
|
||||
diagnostics: Vec<Diagnostic>,
|
||||
language_server: Option<Arc<helix_lsp::Client>>,
|
||||
|
@ -357,7 +350,6 @@ impl Document {
|
|||
let encoding = encoding.unwrap_or(encoding::UTF_8);
|
||||
let changes = ChangeSet::new(&text);
|
||||
let old_state = None;
|
||||
let (save_sender, save_receiver) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
Self {
|
||||
id: DocumentId::default(),
|
||||
|
@ -378,9 +370,6 @@ impl Document {
|
|||
savepoint: None,
|
||||
last_saved_revision: 0,
|
||||
modified_since_accessed: false,
|
||||
save_sender: Some(save_sender),
|
||||
save_receiver: Some(save_receiver),
|
||||
current_save: Arc::new(Mutex::new(None)),
|
||||
language_server: None,
|
||||
}
|
||||
}
|
||||
|
@ -519,21 +508,26 @@ impl Document {
|
|||
&mut self,
|
||||
path: Option<P>,
|
||||
force: bool,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
self.save_impl::<futures_util::future::Ready<_>, _>(path, force)
|
||||
) -> Result<
|
||||
impl Future<Output = Result<DocumentSavedEvent, anyhow::Error>> + 'static + Send,
|
||||
anyhow::Error,
|
||||
> {
|
||||
let path = path.map(|path| path.into());
|
||||
self.save_impl(path, force)
|
||||
|
||||
// futures_util::future::Ready<_>,
|
||||
}
|
||||
|
||||
/// The `Document`'s text is encoded according to its encoding and written to the file located
|
||||
/// at its `path()`.
|
||||
fn save_impl<F, P>(&mut self, path: Option<P>, force: bool) -> Result<(), anyhow::Error>
|
||||
where
|
||||
F: Future<Output = Result<Transaction, FormatterError>> + 'static + Send,
|
||||
P: Into<PathBuf>,
|
||||
{
|
||||
if self.save_sender.is_none() {
|
||||
bail!("saves are closed for this document!");
|
||||
}
|
||||
|
||||
fn save_impl(
|
||||
&mut self,
|
||||
path: Option<PathBuf>,
|
||||
force: bool,
|
||||
) -> Result<
|
||||
impl Future<Output = Result<DocumentSavedEvent, anyhow::Error>> + 'static + Send,
|
||||
anyhow::Error,
|
||||
> {
|
||||
log::debug!(
|
||||
"submitting save of doc '{:?}'",
|
||||
self.path().map(|path| path.to_string_lossy())
|
||||
|
@ -544,7 +538,7 @@ impl Document {
|
|||
let text = self.text().clone();
|
||||
|
||||
let path = match path {
|
||||
Some(path) => helix_core::path::get_canonicalized_path(&path.into())?,
|
||||
Some(path) => helix_core::path::get_canonicalized_path(&path)?,
|
||||
None => {
|
||||
if self.path.is_none() {
|
||||
bail!("Can't save with no path set!");
|
||||
|
@ -564,7 +558,7 @@ impl Document {
|
|||
let encoding = self.encoding;
|
||||
|
||||
// We encode the file according to the `Document`'s encoding.
|
||||
let save_event = async move {
|
||||
let future = async move {
|
||||
use tokio::fs::File;
|
||||
if let Some(parent) = path.parent() {
|
||||
// TODO: display a prompt asking the user if the directories should be created
|
||||
|
@ -604,107 +598,15 @@ impl Document {
|
|||
Ok(event)
|
||||
};
|
||||
|
||||
self.save_sender
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.send(Box::pin(save_event))
|
||||
.map_err(|err| anyhow!("failed to send save event: {}", err))
|
||||
}
|
||||
|
||||
pub async fn await_save(&mut self) -> Option<DocumentSavedEventResult> {
|
||||
self.await_save_impl(true).await
|
||||
}
|
||||
|
||||
async fn await_save_impl(&mut self, block: bool) -> Option<DocumentSavedEventResult> {
|
||||
let mut current_save = self.current_save.lock().await;
|
||||
if let Some(ref mut save) = *current_save {
|
||||
log::trace!("reawaiting save of '{:?}'", self.path());
|
||||
let result = save.await;
|
||||
*current_save = None;
|
||||
log::trace!("reawait save of '{:?}' result: {:?}", self.path(), result);
|
||||
return Some(result);
|
||||
}
|
||||
|
||||
// return early if the receiver is closed
|
||||
let rx = self.save_receiver.as_mut()?;
|
||||
|
||||
let save_req = if block {
|
||||
rx.recv().await
|
||||
} else {
|
||||
let msg = rx.try_recv();
|
||||
|
||||
if let Err(err) = msg {
|
||||
match err {
|
||||
TryRecvError::Empty => return None,
|
||||
TryRecvError::Disconnected => None,
|
||||
}
|
||||
} else {
|
||||
msg.ok()
|
||||
}
|
||||
};
|
||||
|
||||
let save = match save_req {
|
||||
Some(save) => save,
|
||||
None => {
|
||||
self.save_receiver = None;
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
// save a handle to the future so that when a poll on this
|
||||
// function gets cancelled, we don't lose it
|
||||
*current_save = Some(save);
|
||||
log::trace!("awaiting save of '{:?}'", self.path());
|
||||
|
||||
let result = (*current_save).as_mut().unwrap().await;
|
||||
*current_save = None;
|
||||
|
||||
log::trace!("save of '{:?}' result: {:?}", self.path(), result);
|
||||
|
||||
Some(result)
|
||||
}
|
||||
|
||||
/// Flushes the queue of pending writes. If any fail,
|
||||
/// it stops early before emptying the rest of the queue.
|
||||
pub async fn try_flush_saves(&mut self) -> Option<DocumentSavedEventResult> {
|
||||
self.flush_saves_impl(false).await
|
||||
}
|
||||
|
||||
async fn flush_saves_impl(&mut self, block: bool) -> Option<DocumentSavedEventResult> {
|
||||
let mut final_result = None;
|
||||
|
||||
while let Some(save_event) = self.await_save_impl(block).await {
|
||||
let is_err = match &save_event {
|
||||
Ok(event) => {
|
||||
self.set_last_saved_revision(event.revision);
|
||||
false
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!(
|
||||
"error saving document {:?}: {}",
|
||||
self.path().map(|path| path.to_string_lossy()),
|
||||
err
|
||||
);
|
||||
true
|
||||
}
|
||||
};
|
||||
|
||||
final_result = Some(save_event);
|
||||
|
||||
if is_err {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
final_result
|
||||
Ok(future)
|
||||
}
|
||||
|
||||
/// Prepares the Document for being closed by stopping any new writes
|
||||
/// and flushing through the queue of pending writes. If any fail,
|
||||
/// it stops early before emptying the rest of the queue.
|
||||
pub async fn close(&mut self) -> Option<DocumentSavedEventResult> {
|
||||
self.save_sender.take();
|
||||
self.flush_saves_impl(true).await
|
||||
// TODO
|
||||
None
|
||||
}
|
||||
|
||||
/// Detect the programming language based on the file type.
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use crate::{
|
||||
clipboard::{get_clipboard_provider, ClipboardProvider},
|
||||
document::{DocumentSavedEventResult, Mode},
|
||||
document::{DocumentSavedEventFuture, DocumentSavedEventResult, Mode},
|
||||
graphics::{CursorKind, Rect},
|
||||
info::Info,
|
||||
input::KeyEvent,
|
||||
|
@ -9,7 +9,7 @@ use crate::{
|
|||
Document, DocumentId, View, ViewId,
|
||||
};
|
||||
|
||||
use futures_util::stream::{select_all::SelectAll, FuturesUnordered};
|
||||
use futures_util::stream::select_all::SelectAll;
|
||||
use futures_util::{future, StreamExt};
|
||||
use helix_lsp::Call;
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
|
@ -29,7 +29,7 @@ use tokio::{
|
|||
time::{sleep, Duration, Instant, Sleep},
|
||||
};
|
||||
|
||||
use anyhow::Error;
|
||||
use anyhow::{anyhow, Error};
|
||||
|
||||
pub use helix_core::diagnostic::Severity;
|
||||
pub use helix_core::register::Registers;
|
||||
|
@ -644,12 +644,20 @@ pub struct Breakpoint {
|
|||
pub log_message: Option<String>,
|
||||
}
|
||||
|
||||
use futures_util::stream::{Flatten, Once};
|
||||
|
||||
pub struct Editor {
|
||||
/// Current editing mode.
|
||||
pub mode: Mode,
|
||||
pub tree: Tree,
|
||||
pub next_document_id: DocumentId,
|
||||
pub documents: BTreeMap<DocumentId, Document>,
|
||||
|
||||
// We Flatten<> to resolve the inner DocumentSavedEventFuture. For that we need a stream of streams, hence the Once<>.
|
||||
// https://stackoverflow.com/a/66875668
|
||||
pub saves: HashMap<DocumentId, UnboundedSender<Once<DocumentSavedEventFuture>>>,
|
||||
pub save_queue: SelectAll<Flatten<UnboundedReceiverStream<Once<DocumentSavedEventFuture>>>>,
|
||||
|
||||
pub count: Option<std::num::NonZeroUsize>,
|
||||
pub selected_register: Option<char>,
|
||||
pub registers: Registers,
|
||||
|
@ -751,6 +759,8 @@ impl Editor {
|
|||
tree: Tree::new(area),
|
||||
next_document_id: DocumentId::default(),
|
||||
documents: BTreeMap::new(),
|
||||
saves: HashMap::new(),
|
||||
save_queue: SelectAll::new(),
|
||||
count: None,
|
||||
selected_register: None,
|
||||
macro_recording: None,
|
||||
|
@ -1083,6 +1093,12 @@ impl Editor {
|
|||
self.new_document(doc)
|
||||
};
|
||||
|
||||
let (save_sender, save_receiver) = tokio::sync::mpsc::unbounded_channel();
|
||||
self.saves.insert(id, save_sender);
|
||||
|
||||
let stream = UnboundedReceiverStream::new(save_receiver).flatten();
|
||||
self.save_queue.push(stream);
|
||||
|
||||
self.switch(id, action);
|
||||
Ok(id)
|
||||
}
|
||||
|
@ -1095,38 +1111,21 @@ impl Editor {
|
|||
self._refresh();
|
||||
}
|
||||
|
||||
pub async fn close_document(
|
||||
&mut self,
|
||||
doc_id: DocumentId,
|
||||
force: bool,
|
||||
) -> Result<(), CloseError> {
|
||||
pub fn close_document(&mut self, doc_id: DocumentId, force: bool) -> Result<(), CloseError> {
|
||||
let doc = match self.documents.get_mut(&doc_id) {
|
||||
Some(doc) => doc,
|
||||
None => return Err(CloseError::DoesNotExist),
|
||||
};
|
||||
|
||||
// flush out any pending writes first to clear the modified status
|
||||
if let Some(Err(err)) = doc.try_flush_saves().await {
|
||||
return Err(CloseError::SaveError(err));
|
||||
}
|
||||
|
||||
if !force && doc.is_modified() {
|
||||
return Err(CloseError::BufferModified(doc.display_name().into_owned()));
|
||||
}
|
||||
|
||||
if let Some(Err(err)) = doc.close().await {
|
||||
return Err(CloseError::SaveError(err));
|
||||
}
|
||||
// This will also disallow any follow-up writes
|
||||
self.saves.remove(&doc_id);
|
||||
|
||||
// Don't fail the whole write because the language server could not
|
||||
// acknowledge the close
|
||||
if let Some(language_server) = doc.language_server() {
|
||||
if let Err(err) = language_server
|
||||
.text_document_did_close(doc.identifier())
|
||||
.await
|
||||
{
|
||||
log::error!("Error closing doc in language server: {}", err);
|
||||
}
|
||||
// TODO: track error
|
||||
tokio::spawn(language_server.text_document_did_close(doc.identifier()));
|
||||
}
|
||||
|
||||
enum Action {
|
||||
|
@ -1188,6 +1187,28 @@ impl Editor {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn save<P: Into<PathBuf>>(
|
||||
&mut self,
|
||||
doc_id: DocumentId,
|
||||
path: Option<P>,
|
||||
force: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
// convert a channel of futures to pipe into main queue one by one
|
||||
// via stream.then() ? then push into main future
|
||||
|
||||
let path = path.map(|path| path.into());
|
||||
let doc = doc_mut!(self, &doc_id);
|
||||
let future = doc.save(path, force)?;
|
||||
// TODO: if no self.saves for that doc id then bail
|
||||
// bail!("saves are closed for this document!");
|
||||
use futures_util::stream;
|
||||
self.saves[&doc_id]
|
||||
.send(stream::once(Box::pin(future)))
|
||||
.map_err(|err| anyhow!("failed to send save event: {}", err))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn resize(&mut self, area: Rect) {
|
||||
if self.tree.resize(area) {
|
||||
self._refresh();
|
||||
|
@ -1307,16 +1328,10 @@ impl Editor {
|
|||
}
|
||||
|
||||
pub async fn wait_event(&mut self) -> EditorEvent {
|
||||
let mut saves: FuturesUnordered<_> = self
|
||||
.documents
|
||||
.values_mut()
|
||||
.map(Document::await_save)
|
||||
.collect();
|
||||
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
Some(Some(event)) = saves.next() => {
|
||||
Some(event) = self.save_queue.next() => {
|
||||
EditorEvent::DocumentSaved(event)
|
||||
}
|
||||
Some(config_event) = self.config_events.1.recv() => {
|
||||
|
|
Loading…
Add table
Reference in a new issue