openzeppelin_monitor/services/blockchain/transports/ws/
endpoint_manager.rs1use std::sync::Arc;
11use tokio::sync::{Mutex, RwLock};
12use tokio::time::timeout;
13
14use crate::services::blockchain::transports::{ws::config::WsConfig, RotatingTransport};
15
16#[derive(Clone, Debug)]
27pub struct EndpointManager {
28	pub active_url: Arc<RwLock<String>>,
30	pub fallback_urls: Arc<RwLock<Vec<String>>>,
32	rotation_lock: Arc<Mutex<()>>,
34	config: WsConfig,
36}
37
38impl EndpointManager {
39	pub fn new(config: &WsConfig, active_url: &str, fallback_urls: Vec<String>) -> Self {
52		Self {
53			active_url: Arc::new(RwLock::new(active_url.to_string())),
54			fallback_urls: Arc::new(RwLock::new(fallback_urls)),
55			rotation_lock: Arc::new(Mutex::new(())),
56			config: config.clone(),
57		}
58	}
59
60	pub async fn rotate_url<T: RotatingTransport>(
72		&self,
73		transport: &T,
74	) -> Result<(), anyhow::Error> {
75		let _guard = self.rotation_lock.lock().await;
76		let current_active = self.active_url.read().await.clone();
77		let mut attempts = 0;
78
79		while attempts < self.config.max_reconnect_attempts {
80			let new_url = {
81				let mut fallback_urls = self.fallback_urls.write().await;
82				if fallback_urls.is_empty() {
83					return Err(anyhow::anyhow!("No fallback URLs available"));
84				}
85
86				let idx = fallback_urls.iter().position(|url| url != ¤t_active);
88
89				match idx {
90					Some(pos) => fallback_urls.remove(pos),
91					None => {
92						return Err(anyhow::anyhow!("No fallback URLs available"));
93					}
94				}
95			};
96
97			match timeout(
99				self.config.connection_timeout,
100				transport.try_connect(&new_url),
101			)
102			.await
103			{
104				Ok(Ok(_)) => {
105					transport.update_client(&new_url).await?;
106					{
107						let mut active_url = self.active_url.write().await;
108						let mut fallback_urls = self.fallback_urls.write().await;
109						tracing::debug!(
110							"Successful rotation - from: {}, to: {}",
111							current_active,
112							new_url
113						);
114						fallback_urls.push(current_active);
115						*active_url = new_url;
116					}
117					return Ok(());
118				}
119				Ok(Err(e)) => {
120					let mut fallback_urls = self.fallback_urls.write().await;
121					fallback_urls.push(new_url);
122					tracing::warn!("Failed to connect to fallback URL: {}", e);
123				}
124				Err(_) => {
125					let mut fallback_urls = self.fallback_urls.write().await;
126					fallback_urls.push(new_url);
127					tracing::warn!("Connection timeout during rotation");
128				}
129			}
130
131			attempts += 1;
132			if attempts < self.config.max_reconnect_attempts {
133				tokio::time::sleep(self.config.reconnect_timeout).await;
134			}
135		}
136
137		Err(anyhow::anyhow!(
138			"Failed to reconnect after {} attempts",
139			self.config.max_reconnect_attempts
140		))
141	}
142
143	pub async fn get_active_url(&self) -> Result<String, anyhow::Error> {
149		let url = self.active_url.read().await;
150		if url.is_empty() {
151			Err(anyhow::anyhow!("No active URL set"))
152		} else {
153			Ok(url.clone())
154		}
155	}
156
157	pub async fn should_rotate(&self) -> bool {
164		let fallback_urls = self.fallback_urls.read().await;
165		!fallback_urls.is_empty()
166	}
167}