openzeppelin_monitor/services/blockchain/transports/ws/
transport.rs1use async_trait::async_trait;
7use futures_util::{SinkExt, StreamExt};
8use reqwest_middleware::ClientWithMiddleware;
9use serde::Serialize;
10use serde_json::{json, Value};
11use std::{
12	sync::atomic::{AtomicU64, Ordering},
13	sync::Arc,
14	time::Duration,
15};
16use tokio::{net::TcpStream, sync::Mutex, time::timeout};
17use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
18
19use crate::{
20	models::Network,
21	services::blockchain::{
22		transports::{
23			ws::{
24				config::WsConfig, connection::WebSocketConnection,
25				endpoint_manager::EndpointManager,
26			},
27			BlockchainTransport, RotatingTransport,
28		},
29		TransportError,
30	},
31};
32
33#[derive(Clone, Debug)]
44pub struct WsTransportClient {
45	pub connection: Arc<Mutex<WebSocketConnection>>,
47	endpoint_manager: Arc<EndpointManager>,
49	config: WsConfig,
51	request_id_counter: Arc<AtomicU64>,
53}
54
55impl WsTransportClient {
56	pub async fn new(network: &Network, config: Option<WsConfig>) -> Result<Self, anyhow::Error> {
70		let config = config.unwrap_or_else(|| WsConfig::from_network(network));
71
72		let mut ws_urls: Vec<_> = network
74			.rpc_urls
75			.iter()
76			.filter(|rpc_url| rpc_url.type_ == "ws_rpc" && rpc_url.weight > 0)
77			.collect();
78
79		ws_urls.sort_by(|a, b| b.weight.cmp(&a.weight));
80
81		if ws_urls.is_empty() {
82			return Err(anyhow::anyhow!("No WebSocket URLs available"));
83		}
84
85		let mut active_url = None;
87		let mut fallback_urls = Vec::new();
88
89		for rpc_url in ws_urls {
90			let url = rpc_url.url.as_ref().to_string();
91			if active_url.is_none() {
92				match timeout(config.connection_timeout, connect_async(&url)).await {
93					Ok(Ok(_)) => {
94						active_url = Some(url.clone());
95						continue;
97					}
98					Ok(Err(e)) => {
99						tracing::warn!("WS connect failed for {}: {}", url, e);
100						}
102					Err(e) => {
103						tracing::warn!("WS connect timeout for {}: {}", url, e);
104						}
106				}
107			}
108			fallback_urls.push(url);
110		}
111
112		let active_url =
113			active_url.ok_or_else(|| anyhow::anyhow!("Failed to connect to any WebSocket URL"))?;
114		let endpoint_manager = Arc::new(EndpointManager::new(&config, &active_url, fallback_urls));
115		let connection = Arc::new(Mutex::new(WebSocketConnection::default()));
116
117		let client = Self {
118			connection,
119			endpoint_manager,
120			config,
121			request_id_counter: Arc::new(AtomicU64::new(1)),
122		};
123
124		client.connect().await?;
126
127		Ok(client)
128	}
129
130	async fn connect(&self) -> Result<(), anyhow::Error> {
135		let url = self.endpoint_manager.get_active_url().await?;
136		self.try_connect(&url).await
137	}
138
139	async fn send_raw_request<P>(
155		&self,
156		method: &str,
157		params: Option<P>,
158	) -> Result<Value, TransportError>
159	where
160		P: Into<Value> + Send + Clone + Serialize,
161	{
162		loop {
163			let mut connection = self.connection.lock().await;
164			if !connection.is_connected() {
165				return Err(TransportError::network("Not connected", None, None));
166			}
167			connection.update_activity();
168
169			let handle_connection_error = |connection: &mut WebSocketConnection| {
171				connection.is_healthy = false;
172				connection.stream = None;
173			};
174
175			let stream = match connection.stream.as_mut() {
176				Some(stream) => stream,
177				None => {
178					handle_connection_error(&mut connection);
179					drop(connection);
180					if !self.endpoint_manager.should_rotate().await {
181						return Err(TransportError::network("Not connected", None, None));
182					}
183					self.endpoint_manager.rotate_url(self).await.map_err(|e| {
184						TransportError::url_rotation("Failed to rotate URL", Some(e.into()), None)
185					})?;
186					continue;
187				}
188			};
189
190			let request_id = self.request_id_counter.fetch_add(1, Ordering::SeqCst);
192			let request_body = json!({
193				"jsonrpc": "2.0",
194				"id": request_id,
195				"method": method,
196				"params": params.clone().map(|p| p.into())
197			});
198
199			if let Err(e) = stream
201				.send(Message::Text(request_body.to_string().into()))
202				.await
203			{
204				handle_connection_error(&mut connection);
205				drop(connection);
206				if !self.endpoint_manager.should_rotate().await {
207					return Err(TransportError::network(
208						format!("Failed to send request: {}", e),
209						None,
210						None,
211					));
212				}
213				self.endpoint_manager.rotate_url(self).await.map_err(|e| {
214					TransportError::url_rotation("Failed to rotate URL", Some(e.into()), None)
215				})?;
216				continue;
217			}
218
219			async fn handle_ping(
221				stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
222				data: Vec<u8>,
223			) -> Result<(), anyhow::Error> {
224				stream
225					.send(Message::Pong(data.into()))
226					.await
227					.map_err(|e| anyhow::anyhow!("Failed to send pong: {}", e))
228			}
229
230			async fn wait_for_response(
232				stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
233				timeout: Duration,
234			) -> Result<Message, anyhow::Error> {
235				tokio::time::timeout(timeout, stream.next())
236					.await
237					.map_err(|_| anyhow::anyhow!("Response timeout"))?
238					.ok_or_else(|| anyhow::anyhow!("Connection closed"))?
239					.map_err(|e| anyhow::anyhow!("WebSocket error: {}", e))
240			}
241
242			loop {
244				match wait_for_response(stream, self.config.message_timeout).await {
245					Ok(Message::Text(text)) => {
246						let response: Value = serde_json::from_str(&text).map_err(|e| {
248							TransportError::response_parse(
249								"Failed to parse response",
250								Some(e.into()),
251								None,
252							)
253						})?;
254
255						if let Some(response_id) = response.get("id").and_then(|v| v.as_u64()) {
257							if response_id == request_id {
258								return Ok(response);
260							}
261							continue;
263						}
264
265						return Ok(response);
267					}
268					Ok(Message::Ping(data)) => {
269						if let Err(e) = handle_ping(stream, data.to_vec()).await {
271							handle_connection_error(&mut connection);
272							drop(connection);
273							if !self.endpoint_manager.should_rotate().await {
274								return Err(TransportError::network(
275									"Failed to send pong",
276									Some(e.into()),
277									None,
278								));
279							}
280							self.endpoint_manager.rotate_url(self).await.map_err(|e| {
281								TransportError::url_rotation(
282									"Failed to rotate URL",
283									Some(e.into()),
284									None,
285								)
286							})?;
287							break;
288						}
289					}
290					Ok(_) => {
291						handle_connection_error(&mut connection);
292						drop(connection);
293						if !self.endpoint_manager.should_rotate().await {
294							return Err(TransportError::network(
295								"Unexpected message type",
296								None,
297								None,
298							));
299						}
300						self.endpoint_manager.rotate_url(self).await.map_err(|e| {
301							TransportError::url_rotation(
302								"Failed to rotate URL",
303								Some(e.into()),
304								None,
305							)
306						})?;
307						break;
308					}
309					Err(e) => {
310						handle_connection_error(&mut connection);
311						drop(connection);
312						if !self.endpoint_manager.should_rotate().await {
313							return Err(TransportError::network(
314								"Failed to handle response",
315								Some(e.into()),
316								None,
317							));
318						}
319						self.endpoint_manager.rotate_url(self).await.map_err(|e| {
320							TransportError::url_rotation(
321								"Failed to rotate URL",
322								Some(e.into()),
323								None,
324							)
325						})?;
326						break;
327					}
328				}
329			}
330		}
331	}
332}
333
334#[async_trait]
335impl BlockchainTransport for WsTransportClient {
336	async fn get_current_url(&self) -> String {
344		self.endpoint_manager.active_url.read().await.clone()
345	}
346
347	async fn send_raw_request<P>(
365		&self,
366		method: &str,
367		params: Option<P>,
368	) -> Result<Value, TransportError>
369	where
370		P: Into<Value> + Send + Clone + Serialize,
371	{
372		WsTransportClient::send_raw_request(self, method, params).await
373	}
374
375	fn update_endpoint_manager_client(
379		&mut self,
380		_client: ClientWithMiddleware,
381	) -> Result<(), anyhow::Error> {
382		Err(anyhow::anyhow!(
383			"`update_endpoint_manager_client` not implemented"
384		))
385	}
386}
387
388#[async_trait]
389impl RotatingTransport for WsTransportClient {
390	async fn try_connect(&self, url: &str) -> Result<(), anyhow::Error> {
401		let mut connection = self.connection.lock().await;
402
403		match timeout(self.config.connection_timeout, connect_async(url)).await {
404			Ok(Ok((ws_stream, _))) => {
405				connection.stream = Some(ws_stream);
406				connection.is_healthy = true;
407				connection.update_activity();
408				Ok(())
409			}
410			Ok(Err(e)) => {
411				connection.is_healthy = false;
412				Err(anyhow::anyhow!("Failed to connect: {}", e))
413			}
414			Err(_) => {
415				connection.is_healthy = false;
416				Err(anyhow::anyhow!("Connection timeout"))
417			}
418		}
419	}
420
421	async fn update_client(&self, url: &str) -> Result<(), anyhow::Error> {
432		*self.endpoint_manager.active_url.write().await = url.to_string();
433		Ok(())
434	}
435}