refactor response processing
This commit is contained in:
parent
9678df1c62
commit
59d6b92e5b
1 changed files with 37 additions and 41 deletions
|
@ -1,6 +1,6 @@
|
|||
use crate::{Error, Result};
|
||||
use anyhow::Context;
|
||||
use log::{error, info};
|
||||
use log::{error, info, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
|
@ -162,54 +162,50 @@ impl Transport {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn process_response(res: Response) -> Result<Response> {
|
||||
match res.success {
|
||||
true => {
|
||||
info!(
|
||||
"<- DAP success ({}, in response to {})",
|
||||
res.seq, res.request_seq
|
||||
);
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
false => {
|
||||
error!(
|
||||
"<- DAP error {:?} ({:?}) for command #{} {}",
|
||||
res.message, res.body, res.request_seq, res.command
|
||||
);
|
||||
|
||||
Err(Error::Other(anyhow::format_err!("{:?}", res.body)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_server_message(
|
||||
&self,
|
||||
client_tx: &UnboundedSender<Payload>,
|
||||
msg: Payload,
|
||||
) -> Result<()> {
|
||||
match msg {
|
||||
Payload::Response(Response {
|
||||
ref success,
|
||||
ref seq,
|
||||
request_seq,
|
||||
ref command,
|
||||
ref message,
|
||||
ref body,
|
||||
..
|
||||
}) => {
|
||||
let result = match success {
|
||||
true => {
|
||||
info!("<- DAP success ({}, in response to {})", seq, request_seq);
|
||||
if let Payload::Response(val) = msg {
|
||||
Ok(val)
|
||||
} else {
|
||||
unreachable!();
|
||||
}
|
||||
Payload::Response(res) => {
|
||||
let request_seq = res.request_seq;
|
||||
let tx = self.pending_requests.lock().await.remove(&request_seq);
|
||||
|
||||
match tx {
|
||||
Some(tx) => match tx.send(Self::process_response(res)).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => error!(
|
||||
"Tried sending response into a closed channel (id={:?}), original request likely timed out",
|
||||
request_seq
|
||||
),
|
||||
}
|
||||
false => {
|
||||
error!(
|
||||
"<- DAP error {:?} ({:?}) for command #{} {}",
|
||||
message, body, request_seq, command
|
||||
);
|
||||
|
||||
Err(Error::Other(anyhow::format_err!("{:?}", body)))
|
||||
None => {
|
||||
warn!("Response to nonexistent request #{}", res.request_seq);
|
||||
client_tx.send(Payload::Response(res)).expect("Failed to send");
|
||||
}
|
||||
};
|
||||
|
||||
let tx = self
|
||||
.pending_requests
|
||||
.lock()
|
||||
.await
|
||||
.remove(&request_seq)
|
||||
.expect("pending_request with id not found!");
|
||||
|
||||
match tx.send(result).await {
|
||||
Ok(_) => (),
|
||||
Err(_) => error!(
|
||||
"Tried sending response into a closed channel (id={:?}), original request likely timed out",
|
||||
request_seq
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue