From 46f3c69f06cc55f36bcc6244a9f96c2481836dea Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Bla=C5=BE=20Hrastnik?= <blaz@mxxn.io>
Date: Thu, 2 Sep 2021 13:55:08 +0900
Subject: [PATCH] lsp: Don't send notifications until initialize completes

Then send open events for all documents with the LSP attached.
---
 helix-lsp/src/lib.rs          | 86 +++++++++++++++++------------------
 helix-lsp/src/transport.rs    | 29 +++++++++++-
 helix-term/src/application.rs | 31 +++++++++++++
 helix-view/src/editor.rs      |  5 +-
 4 files changed, 105 insertions(+), 46 deletions(-)

diff --git a/helix-lsp/src/lib.rs b/helix-lsp/src/lib.rs
index e10c107b..7357c885 100644
--- a/helix-lsp/src/lib.rs
+++ b/helix-lsp/src/lib.rs
@@ -226,6 +226,8 @@ impl MethodCall {
 
 #[derive(Debug, PartialEq, Clone)]
 pub enum Notification {
+    // we inject this notification to signal the LSP is ready
+    Initialized,
     PublishDiagnostics(lsp::PublishDiagnosticsParams),
     ShowMessage(lsp::ShowMessageParams),
     LogMessage(lsp::LogMessageParams),
@@ -237,6 +239,7 @@ impl Notification {
         use lsp::notification::Notification as _;
 
         let notification = match method {
+            lsp::notification::Initialized::METHOD => Self::Initialized,
             lsp::notification::PublishDiagnostics::METHOD => {
                 let params: lsp::PublishDiagnosticsParams = params
                     .parse()
@@ -294,7 +297,7 @@ impl Registry {
         }
     }
 
-    pub fn get_by_id(&mut self, id: usize) -> Option<&Client> {
+    pub fn get_by_id(&self, id: usize) -> Option<&Client> {
         self.inner
             .values()
             .find(|(client_id, _)| client_id == &id)
@@ -302,55 +305,52 @@ impl Registry {
     }
 
     pub fn get(&mut self, language_config: &LanguageConfiguration) -> Result<Arc<Client>> {
-        if let Some(config) = &language_config.language_server {
-            // avoid borrow issues
-            let inner = &mut self.inner;
-            let s_incoming = &mut self.incoming;
+        let config = match &language_config.language_server {
+            Some(config) => config,
+            None => return Err(Error::LspNotDefined),
+        };
 
-            match inner.entry(language_config.scope.clone()) {
-                Entry::Occupied(entry) => Ok(entry.get().1.clone()),
-                Entry::Vacant(entry) => {
-                    // initialize a new client
-                    let id = self.counter.fetch_add(1, Ordering::Relaxed);
-                    let (client, incoming, initialize_notify) = Client::start(
-                        &config.command,
-                        &config.args,
-                        serde_json::from_str(language_config.config.as_deref().unwrap_or("")).ok(),
-                        id,
-                    )?;
-                    s_incoming.push(UnboundedReceiverStream::new(incoming));
-                    let client = Arc::new(client);
+        match self.inner.entry(language_config.scope.clone()) {
+            Entry::Occupied(entry) => Ok(entry.get().1.clone()),
+            Entry::Vacant(entry) => {
+                // initialize a new client
+                let id = self.counter.fetch_add(1, Ordering::Relaxed);
+                let (client, incoming, initialize_notify) = Client::start(
+                    &config.command,
+                    &config.args,
+                    serde_json::from_str(language_config.config.as_deref().unwrap_or("")).ok(),
+                    id,
+                )?;
+                self.incoming.push(UnboundedReceiverStream::new(incoming));
+                let client = Arc::new(client);
 
-                    let _client = client.clone();
-                    // Initialize the client asynchronously
-                    tokio::spawn(async move {
-                        use futures_util::TryFutureExt;
-                        let value = _client
-                            .capabilities
-                            .get_or_try_init(|| {
-                                _client
-                                    .initialize()
-                                    .map_ok(|response| response.capabilities)
-                            })
-                            .await;
+                // Initialize the client asynchronously
+                let _client = client.clone();
+                tokio::spawn(async move {
+                    use futures_util::TryFutureExt;
+                    let value = _client
+                        .capabilities
+                        .get_or_try_init(|| {
+                            _client
+                                .initialize()
+                                .map_ok(|response| response.capabilities)
+                        })
+                        .await;
 
-                        value.expect("failed to initialize capabilities");
+                    value.expect("failed to initialize capabilities");
 
-                        // next up, notify<initialized>
-                        _client
-                            .notify::<lsp::notification::Initialized>(lsp::InitializedParams {})
-                            .await
-                            .unwrap();
+                    // next up, notify<initialized>
+                    _client
+                        .notify::<lsp::notification::Initialized>(lsp::InitializedParams {})
+                        .await
+                        .unwrap();
 
-                        initialize_notify.notify_one();
-                    });
+                    initialize_notify.notify_one();
+                });
 
-                    entry.insert((id, client.clone()));
-                    Ok(client)
-                }
+                entry.insert((id, client.clone()));
+                Ok(client)
             }
-        } else {
-            Err(Error::LspNotDefined)
         }
     }
 
diff --git a/helix-lsp/src/transport.rs b/helix-lsp/src/transport.rs
index 071c5b93..cf7e66a8 100644
--- a/helix-lsp/src/transport.rs
+++ b/helix-lsp/src/transport.rs
@@ -64,11 +64,16 @@ impl Transport {
 
         let transport = Arc::new(transport);
 
-        tokio::spawn(Self::recv(transport.clone(), server_stdout, client_tx));
+        tokio::spawn(Self::recv(
+            transport.clone(),
+            server_stdout,
+            client_tx.clone(),
+        ));
         tokio::spawn(Self::err(transport.clone(), server_stderr));
         tokio::spawn(Self::send(
             transport,
             server_stdin,
+            client_tx,
             client_rx,
             notify.clone(),
         ));
@@ -269,6 +274,7 @@ impl Transport {
     async fn send(
         transport: Arc<Self>,
         mut server_stdin: BufWriter<ChildStdin>,
+        mut client_tx: UnboundedSender<(usize, jsonrpc::Call)>,
         mut client_rx: UnboundedReceiver<Payload>,
         initialize_notify: Arc<Notify>,
     ) {
@@ -303,6 +309,22 @@ impl Transport {
                 _ = initialize_notify.notified() => { // TODO: notified is technically not cancellation safe
                     // server successfully initialized
                     is_pending = false;
+
+                    use lsp_types::notification::Notification;
+                    // Hack: inject an initialized notification so we trigger code that needs to happen after init
+                    let notification = ServerMessage::Call(jsonrpc::Call::Notification(jsonrpc::Notification {
+                        jsonrpc: None,
+
+                        method: lsp_types::notification::Initialized::METHOD.to_string(),
+                        params: jsonrpc::Params::None,
+                    }));
+                    match transport.process_server_message(&mut client_tx, notification).await {
+                        Ok(_) => {}
+                        Err(err) => {
+                            error!("err: <- {:?}", err);
+                        }
+                    }
+
                     // drain the pending queue and send payloads to server
                     for msg in pending_messages.drain(..) {
                         log::info!("Draining pending message {:?}", msg);
@@ -317,6 +339,11 @@ impl Transport {
                 msg = client_rx.recv() => {
                     if let Some(msg) = msg {
                         if is_pending && !is_initialize(&msg) {
+                            // ignore notifications
+                            if let Payload::Notification(_) = msg {
+                                continue;
+                            }
+
                             log::info!("Language server not initialized, delaying request");
                             pending_messages.push(msg);
                         } else {
diff --git a/helix-term/src/application.rs b/helix-term/src/application.rs
index d3b65a4f..e21c5504 100644
--- a/helix-term/src/application.rs
+++ b/helix-term/src/application.rs
@@ -275,6 +275,37 @@ impl Application {
                 };
 
                 match notification {
+                    Notification::Initialized => {
+                        let language_server =
+                            match self.editor.language_servers.get_by_id(server_id) {
+                                Some(language_server) => language_server,
+                                None => {
+                                    warn!("can't find language server with id `{}`", server_id);
+                                    return;
+                                }
+                            };
+
+                        let docs = self.editor.documents().filter(|doc| {
+                            doc.language_server().map(|server| server.id()) == Some(server_id)
+                        });
+
+                        // trigger textDocument/didOpen for docs that are already open
+                        for doc in docs {
+                            // TODO: extract and share with editor.open
+                            let language_id = doc
+                                .language()
+                                .and_then(|s| s.split('.').last()) // source.rust
+                                .map(ToOwned::to_owned)
+                                .unwrap_or_default();
+
+                            tokio::spawn(language_server.text_document_did_open(
+                                doc.url().unwrap(),
+                                doc.version(),
+                                doc.text(),
+                                language_id,
+                            ));
+                        }
+                    }
                     Notification::PublishDiagnostics(params) => {
                         let path = params.uri.to_file_path().unwrap();
                         let doc = self.editor.document_by_path_mut(&path);
diff --git a/helix-view/src/editor.rs b/helix-view/src/editor.rs
index c8abd5b5..3d2d4a87 100644
--- a/helix-view/src/editor.rs
+++ b/helix-view/src/editor.rs
@@ -255,20 +255,21 @@ impl Editor {
                 .and_then(|language| self.language_servers.get(language).ok());
 
             if let Some(language_server) = language_server {
-                doc.set_language_server(Some(language_server.clone()));
-
                 let language_id = doc
                     .language()
                     .and_then(|s| s.split('.').last()) // source.rust
                     .map(ToOwned::to_owned)
                     .unwrap_or_default();
 
+                // TODO: this now races with on_init code if the init happens too quickly
                 tokio::spawn(language_server.text_document_did_open(
                     doc.url().unwrap(),
                     doc.version(),
                     doc.text(),
                     language_id,
                 ));
+
+                doc.set_language_server(Some(language_server));
             }
 
             let id = self.documents.insert(doc);