openzeppelin_monitor/services/blockchain/transports/ws/
transport.rs

1//! Websocket transport implementation for blockchain interactions.
2//!
3//! This module provides a WebSocket client implementation for interacting with blockchain nodes
4//! via WebSocket protocol, supporting connection checks and failover.
5
6use async_trait::async_trait;
7use futures_util::{SinkExt, StreamExt};
8use reqwest_middleware::ClientWithMiddleware;
9use serde::Serialize;
10use serde_json::{json, Value};
11use std::{
12	sync::atomic::{AtomicU64, Ordering},
13	sync::Arc,
14	time::Duration,
15};
16use tokio::{net::TcpStream, sync::Mutex, time::timeout};
17use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
18
19use crate::{
20	models::Network,
21	services::blockchain::{
22		transports::{
23			ws::{
24				config::WsConfig, connection::WebSocketConnection,
25				endpoint_manager::EndpointManager,
26			},
27			BlockchainTransport, RotatingTransport,
28		},
29		TransportError,
30	},
31};
32
33/// Basic WebSocket transport client for blockchain interactions
34///
35/// This client provides a foundation for making WebSocket connections to blockchain nodes
36/// with built-in support for:
37/// - Connection pooling and reuse
38/// - Automatic endpoint rotation on failure
39/// - Configurable timeouts and reconnection policies
40/// - Heartbeat monitoring
41///
42/// The client is thread-safe and can be shared across multiple tasks.
43#[derive(Clone, Debug)]
44pub struct WsTransportClient {
45	/// WebSocket connection state and stream
46	pub connection: Arc<Mutex<WebSocketConnection>>,
47	/// Manages WebSocket endpoint rotation and request handling
48	endpoint_manager: Arc<EndpointManager>,
49	/// Configuration settings for WebSocket connections
50	config: WsConfig,
51	/// Counter for generating unique request IDs
52	request_id_counter: Arc<AtomicU64>,
53}
54
55impl WsTransportClient {
56	/// Creates a new WebSocket transport client with automatic endpoint management
57	///
58	/// This constructor:
59	/// 1. Filters and sorts WebSocket RPC URLs by weight
60	/// 2. Tests each URL's connectivity with timeout
61	/// 3. Uses the first working URL as active
62	/// 4. Adds any additional URLs as fallbacks
63	///
64	/// # Arguments
65	/// * `network` - Network configuration containing RPC URLs, weights, and other details
66	///
67	/// # Returns
68	/// * `Result<Self, anyhow::Error>` - New client instance or connection error
69	pub async fn new(network: &Network, config: Option<WsConfig>) -> Result<Self, anyhow::Error> {
70		let config = config.unwrap_or_else(|| WsConfig::from_network(network));
71
72		// Filter and sort WebSocket URLs by weight
73		let mut ws_urls: Vec<_> = network
74			.rpc_urls
75			.iter()
76			.filter(|rpc_url| rpc_url.type_ == "ws_rpc" && rpc_url.weight > 0)
77			.collect();
78
79		ws_urls.sort_by(|a, b| b.weight.cmp(&a.weight));
80
81		if ws_urls.is_empty() {
82			return Err(anyhow::anyhow!("No WebSocket URLs available"));
83		}
84
85		// Find first working URL and use rest as fallbacks
86		let mut active_url = None;
87		let mut fallback_urls = Vec::new();
88
89		for rpc_url in ws_urls {
90			let url = rpc_url.url.as_ref().to_string();
91			if active_url.is_none() {
92				match timeout(config.connection_timeout, connect_async(&url)).await {
93					Ok(Ok(_)) => {
94						active_url = Some(url.clone());
95						// Picked as active; do not push to fallbacks
96						continue;
97					}
98					Ok(Err(e)) => {
99						tracing::warn!("WS connect failed for {}: {}", url, e);
100						// try next URL
101					}
102					Err(e) => {
103						tracing::warn!("WS connect timeout for {}: {}", url, e);
104						// try next URL
105					}
106				}
107			}
108			// Either already have active, or this one failed and remains a fallback candidate
109			fallback_urls.push(url);
110		}
111
112		let active_url =
113			active_url.ok_or_else(|| anyhow::anyhow!("Failed to connect to any WebSocket URL"))?;
114		let endpoint_manager = Arc::new(EndpointManager::new(&config, &active_url, fallback_urls));
115		let connection = Arc::new(Mutex::new(WebSocketConnection::default()));
116
117		let client = Self {
118			connection,
119			endpoint_manager,
120			config,
121			request_id_counter: Arc::new(AtomicU64::new(1)),
122		};
123
124		// Initial connection
125		client.connect().await?;
126
127		Ok(client)
128	}
129
130	/// Establishes initial connection to the active endpoint
131	///
132	/// # Returns
133	/// * `Result<(), anyhow::Error>` - Success or connection error
134	async fn connect(&self) -> Result<(), anyhow::Error> {
135		let url = self.endpoint_manager.get_active_url().await?;
136		self.try_connect(&url).await
137	}
138
139	/// Sends a JSON-RPC request via WebSocket
140	///
141	/// This method handles:
142	/// - Connection state verification
143	/// - Request formatting
144	/// - Message sending with timeout
145	/// - Response parsing
146	/// - Automatic URL rotation on failure
147	///
148	/// # Arguments
149	/// * `method` - The RPC method to call
150	/// * `params` - Optional parameters for the method call
151	///
152	/// # Returns
153	/// * `Result<Value, TransportError>` - JSON response or error
154	async fn send_raw_request<P>(
155		&self,
156		method: &str,
157		params: Option<P>,
158	) -> Result<Value, TransportError>
159	where
160		P: Into<Value> + Send + Clone + Serialize,
161	{
162		loop {
163			let mut connection = self.connection.lock().await;
164			if !connection.is_connected() {
165				return Err(TransportError::network("Not connected", None, None));
166			}
167			connection.update_activity();
168
169			// Helper function to handle connection errors
170			let handle_connection_error = |connection: &mut WebSocketConnection| {
171				connection.is_healthy = false;
172				connection.stream = None;
173			};
174
175			let stream = match connection.stream.as_mut() {
176				Some(stream) => stream,
177				None => {
178					handle_connection_error(&mut connection);
179					drop(connection);
180					if !self.endpoint_manager.should_rotate().await {
181						return Err(TransportError::network("Not connected", None, None));
182					}
183					self.endpoint_manager.rotate_url(self).await.map_err(|e| {
184						TransportError::url_rotation("Failed to rotate URL", Some(e.into()), None)
185					})?;
186					continue;
187				}
188			};
189
190			// Generate unique request ID
191			let request_id = self.request_id_counter.fetch_add(1, Ordering::SeqCst);
192			let request_body = json!({
193				"jsonrpc": "2.0",
194				"id": request_id,
195				"method": method,
196				"params": params.clone().map(|p| p.into())
197			});
198
199			// Try to send the request
200			if let Err(e) = stream
201				.send(Message::Text(request_body.to_string().into()))
202				.await
203			{
204				handle_connection_error(&mut connection);
205				drop(connection);
206				if !self.endpoint_manager.should_rotate().await {
207					return Err(TransportError::network(
208						format!("Failed to send request: {}", e),
209						None,
210						None,
211					));
212				}
213				self.endpoint_manager.rotate_url(self).await.map_err(|e| {
214					TransportError::url_rotation("Failed to rotate URL", Some(e.into()), None)
215				})?;
216				continue;
217			}
218
219			// Helper function to handle ping messages
220			async fn handle_ping(
221				stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
222				data: Vec<u8>,
223			) -> Result<(), anyhow::Error> {
224				stream
225					.send(Message::Pong(data.into()))
226					.await
227					.map_err(|e| anyhow::anyhow!("Failed to send pong: {}", e))
228			}
229
230			// Helper function to wait for response
231			async fn wait_for_response(
232				stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
233				timeout: Duration,
234			) -> Result<Message, anyhow::Error> {
235				tokio::time::timeout(timeout, stream.next())
236					.await
237					.map_err(|_| anyhow::anyhow!("Response timeout"))?
238					.ok_or_else(|| anyhow::anyhow!("Connection closed"))?
239					.map_err(|e| anyhow::anyhow!("WebSocket error: {}", e))
240			}
241
242			// Wait for response with timeout, retrying until we get our specific response ID
243			loop {
244				match wait_for_response(stream, self.config.message_timeout).await {
245					Ok(Message::Text(text)) => {
246						// Parse the response
247						let response: Value = serde_json::from_str(&text).map_err(|e| {
248							TransportError::response_parse(
249								"Failed to parse response",
250								Some(e.into()),
251								None,
252							)
253						})?;
254
255						// Check if this response is for our request
256						if let Some(response_id) = response.get("id").and_then(|v| v.as_u64()) {
257							if response_id == request_id {
258								// This is our response!
259								return Ok(response);
260							}
261							// Not our response, continue waiting
262							continue;
263						}
264
265						// No ID in response
266						return Ok(response);
267					}
268					Ok(Message::Ping(data)) => {
269						// Respond to ping and wait for actual response
270						if let Err(e) = handle_ping(stream, data.to_vec()).await {
271							handle_connection_error(&mut connection);
272							drop(connection);
273							if !self.endpoint_manager.should_rotate().await {
274								return Err(TransportError::network(
275									"Failed to send pong",
276									Some(e.into()),
277									None,
278								));
279							}
280							self.endpoint_manager.rotate_url(self).await.map_err(|e| {
281								TransportError::url_rotation(
282									"Failed to rotate URL",
283									Some(e.into()),
284									None,
285								)
286							})?;
287							break;
288						}
289					}
290					Ok(_) => {
291						handle_connection_error(&mut connection);
292						drop(connection);
293						if !self.endpoint_manager.should_rotate().await {
294							return Err(TransportError::network(
295								"Unexpected message type",
296								None,
297								None,
298							));
299						}
300						self.endpoint_manager.rotate_url(self).await.map_err(|e| {
301							TransportError::url_rotation(
302								"Failed to rotate URL",
303								Some(e.into()),
304								None,
305							)
306						})?;
307						break;
308					}
309					Err(e) => {
310						handle_connection_error(&mut connection);
311						drop(connection);
312						if !self.endpoint_manager.should_rotate().await {
313							return Err(TransportError::network(
314								"Failed to handle response",
315								Some(e.into()),
316								None,
317							));
318						}
319						self.endpoint_manager.rotate_url(self).await.map_err(|e| {
320							TransportError::url_rotation(
321								"Failed to rotate URL",
322								Some(e.into()),
323								None,
324							)
325						})?;
326						break;
327					}
328				}
329			}
330		}
331	}
332}
333
334#[async_trait]
335impl BlockchainTransport for WsTransportClient {
336	/// Retrieves the currently active RPC endpoint URL
337	///
338	/// This method is useful for monitoring which endpoint is currently in use,
339	/// especially in scenarios with multiple failover URLs.
340	///
341	/// # Returns
342	/// * `String` - The URL of the currently active endpoint
343	async fn get_current_url(&self) -> String {
344		self.endpoint_manager.active_url.read().await.clone()
345	}
346
347	/// Sends a JSON-RPC request to the blockchain node via WebSocket
348	///
349	/// This method handles the formatting of the JSON-RPC request, including:
350	/// - Adding required JSON-RPC 2.0 fields
351	/// - Converting parameters to the correct format
352	/// - Connection health checks
353	/// - Activity tracking
354	///
355	/// # Arguments
356	/// * `method` - The JSON-RPC method name to call
357	/// * `params` - Optional parameters for the method call
358	///
359	/// # Returns
360	/// * `Result<Value, anyhow::Error>` - JSON response or error with context
361	///
362	/// # Type Parameters
363	/// * `P` - Parameter type that can be serialized to JSON
364	async fn send_raw_request<P>(
365		&self,
366		method: &str,
367		params: Option<P>,
368	) -> Result<Value, TransportError>
369	where
370		P: Into<Value> + Send + Clone + Serialize,
371	{
372		WsTransportClient::send_raw_request(self, method, params).await
373	}
374
375	/// Update endpoint manager with a new client
376	///
377	/// Note: Not applicable for WebSocket transport
378	fn update_endpoint_manager_client(
379		&mut self,
380		_client: ClientWithMiddleware,
381	) -> Result<(), anyhow::Error> {
382		Err(anyhow::anyhow!(
383			"`update_endpoint_manager_client` not implemented"
384		))
385	}
386}
387
388#[async_trait]
389impl RotatingTransport for WsTransportClient {
390	/// Tests connectivity to a specific WebSocket endpoint
391	///
392	/// Attempts to establish a WebSocket connection with timeout and updates
393	/// the connection state accordingly.
394	///
395	/// # Arguments
396	/// * `url` - The WebSocket URL to test
397	///
398	/// # Returns
399	/// * `Result<(), anyhow::Error>` - Success or detailed error message
400	async fn try_connect(&self, url: &str) -> Result<(), anyhow::Error> {
401		let mut connection = self.connection.lock().await;
402
403		match timeout(self.config.connection_timeout, connect_async(url)).await {
404			Ok(Ok((ws_stream, _))) => {
405				connection.stream = Some(ws_stream);
406				connection.is_healthy = true;
407				connection.update_activity();
408				Ok(())
409			}
410			Ok(Err(e)) => {
411				connection.is_healthy = false;
412				Err(anyhow::anyhow!("Failed to connect: {}", e))
413			}
414			Err(_) => {
415				connection.is_healthy = false;
416				Err(anyhow::anyhow!("Connection timeout"))
417			}
418		}
419	}
420
421	/// Updates the active endpoint URL
422	///
423	/// This method is called when rotating to a new endpoint, typically
424	/// after a failure of the current endpoint.
425	///
426	/// # Arguments
427	/// * `url` - The new URL to use for subsequent connections
428	///
429	/// # Returns
430	/// * `Result<(), anyhow::Error>` - Success or error status
431	async fn update_client(&self, url: &str) -> Result<(), anyhow::Error> {
432		*self.endpoint_manager.active_url.write().await = url.to_string();
433		Ok(())
434	}
435}