openzeppelin_monitor/services/blockchain/transports/ws/
endpoint_manager.rs

1//! Manages the rotation of blockchain WebSocket RPC endpoints
2//!
3//! This module provides functionality for managing WebSocket connections to blockchain nodes,
4//! including:
5//! - Automatic failover between multiple RPC endpoints
6//! - Weight-based URL selection
7//! - Connection health monitoring
8//! - Thread-safe URL rotation
9
10use std::sync::Arc;
11use tokio::sync::{Mutex, RwLock};
12use tokio::time::timeout;
13
14use crate::services::blockchain::transports::{ws::config::WsConfig, RotatingTransport};
15
16/// Manages WebSocket RPC endpoint rotation and failover
17///
18/// This struct provides thread-safe management of WebSocket connections to blockchain nodes,
19/// handling automatic failover between multiple endpoints based on their weights and health.
20///
21/// # Fields
22/// * `active_url` - The currently active WebSocket endpoint URL
23/// * `fallback_urls` - List of fallback URLs to use when the active URL fails
24/// * `rotation_lock` - Mutex to ensure thread-safe URL rotation
25/// * `config` - Configuration settings for WebSocket connections
26#[derive(Clone, Debug)]
27pub struct EndpointManager {
28	/// The currently active WebSocket endpoint URL
29	pub active_url: Arc<RwLock<String>>,
30	/// List of fallback URLs to use when the active URL fails
31	pub fallback_urls: Arc<RwLock<Vec<String>>>,
32	/// Mutex to ensure thread-safe URL rotation
33	rotation_lock: Arc<Mutex<()>>,
34	/// Configuration settings for WebSocket connections
35	config: WsConfig,
36}
37
38impl EndpointManager {
39	/// Creates a new WebSocket endpoint manager
40	///
41	/// Initializes the endpoint manager with a primary URL and a list of fallback URLs.
42	/// The URLs should be pre-sorted by weight, with the highest weight URL as the active one.
43	///
44	/// # Arguments
45	/// * `config` - WebSocket configuration settings
46	/// * `active_url` - The initial active WebSocket URL
47	/// * `fallback_urls` - List of fallback URLs, pre-sorted by weight
48	///
49	/// # Returns
50	/// A new `EndpointManager` instance
51	pub fn new(config: &WsConfig, active_url: &str, fallback_urls: Vec<String>) -> Self {
52		Self {
53			active_url: Arc::new(RwLock::new(active_url.to_string())),
54			fallback_urls: Arc::new(RwLock::new(fallback_urls)),
55			rotation_lock: Arc::new(Mutex::new(())),
56			config: config.clone(),
57		}
58	}
59
60	/// Rotates to the next available WebSocket URL
61	///
62	/// Attempts to connect to a different URL from the fallback list. If successful,
63	/// updates the active URL and moves the old active URL to the fallback list.
64	///
65	/// # Arguments
66	/// * `transport` - The transport client implementing the `RotatingTransport` trait
67	///
68	/// # Returns
69	/// * `Ok(())` if rotation was successful
70	/// * `Err` if no fallback URLs are available or connection fails
71	pub async fn rotate_url<T: RotatingTransport>(
72		&self,
73		transport: &T,
74	) -> Result<(), anyhow::Error> {
75		let _guard = self.rotation_lock.lock().await;
76		let current_active = self.active_url.read().await.clone();
77		let mut attempts = 0;
78
79		while attempts < self.config.max_reconnect_attempts {
80			let new_url = {
81				let mut fallback_urls = self.fallback_urls.write().await;
82				if fallback_urls.is_empty() {
83					return Err(anyhow::anyhow!("No fallback URLs available"));
84				}
85
86				// Find first URL that's different from current
87				let idx = fallback_urls.iter().position(|url| url != &current_active);
88
89				match idx {
90					Some(pos) => fallback_urls.remove(pos),
91					None => {
92						return Err(anyhow::anyhow!("No fallback URLs available"));
93					}
94				}
95			};
96
97			// Use connection timeout from config
98			match timeout(
99				self.config.connection_timeout,
100				transport.try_connect(&new_url),
101			)
102			.await
103			{
104				Ok(Ok(_)) => {
105					transport.update_client(&new_url).await?;
106					{
107						let mut active_url = self.active_url.write().await;
108						let mut fallback_urls = self.fallback_urls.write().await;
109						tracing::debug!(
110							"Successful rotation - from: {}, to: {}",
111							current_active,
112							new_url
113						);
114						fallback_urls.push(current_active);
115						*active_url = new_url;
116					}
117					return Ok(());
118				}
119				Ok(Err(e)) => {
120					let mut fallback_urls = self.fallback_urls.write().await;
121					fallback_urls.push(new_url);
122					tracing::warn!("Failed to connect to fallback URL: {}", e);
123				}
124				Err(_) => {
125					let mut fallback_urls = self.fallback_urls.write().await;
126					fallback_urls.push(new_url);
127					tracing::warn!("Connection timeout during rotation");
128				}
129			}
130
131			attempts += 1;
132			if attempts < self.config.max_reconnect_attempts {
133				tokio::time::sleep(self.config.reconnect_timeout).await;
134			}
135		}
136
137		Err(anyhow::anyhow!(
138			"Failed to reconnect after {} attempts",
139			self.config.max_reconnect_attempts
140		))
141	}
142
143	/// Retrieves the currently active WebSocket URL
144	///
145	/// # Returns
146	/// * `Ok(String)` containing the active URL
147	/// * `Err` if no active URL is set
148	pub async fn get_active_url(&self) -> Result<String, anyhow::Error> {
149		let url = self.active_url.read().await;
150		if url.is_empty() {
151			Err(anyhow::anyhow!("No active URL set"))
152		} else {
153			Ok(url.clone())
154		}
155	}
156
157	/// Checks if URL rotation should be attempted
158	///
159	/// Determines if there are any fallback URLs available for rotation.
160	///
161	/// # Returns
162	/// `true` if rotation should be attempted, `false` otherwise
163	pub async fn should_rotate(&self) -> bool {
164		let fallback_urls = self.fallback_urls.read().await;
165		!fallback_urls.is_empty()
166	}
167}