openzeppelin_monitor/services/blockchain/transports/http/
transport.rs1use anyhow::Context;
12use async_trait::async_trait;
13use reqwest_middleware::ClientWithMiddleware;
14use serde::Serialize;
15use serde_json::{json, Value};
16use std::{sync::Arc, time::Duration};
17use url::Url;
18
19use crate::{
20	models::Network,
21	services::blockchain::transports::{
22		http::endpoint_manager::EndpointManager, BlockchainTransport, RotatingTransport,
23		TransientErrorRetryStrategy, TransportError,
24	},
25	utils::http::{create_retryable_http_client, RetryConfig},
26};
27
28#[derive(Clone, Debug)]
38pub struct HttpTransportClient {
39	pub client: ClientWithMiddleware,
41	endpoint_manager: EndpointManager,
43	test_connection_payload: Option<String>,
45}
46
47impl HttpTransportClient {
48	pub async fn new(
61		network: &Network,
62		test_connection_payload: Option<String>,
63	) -> Result<Self, anyhow::Error> {
64		let mut rpc_urls: Vec<_> = network
65			.rpc_urls
66			.iter()
67			.filter(|rpc_url| rpc_url.type_ == "rpc" && rpc_url.weight > 0)
68			.collect();
69
70		rpc_urls.sort_by(|a, b| b.weight.cmp(&a.weight));
71		let http_retry_config = RetryConfig::default();
74		let base_http_client = Arc::new(
76			reqwest::ClientBuilder::new()
77				.pool_idle_timeout(Duration::from_secs(90))
78				.pool_max_idle_per_host(32)
79				.timeout(Duration::from_secs(30))
80				.connect_timeout(Duration::from_secs(20))
81				.use_rustls_tls()
82				.build()
83				.context("Failed to create base HTTP client")?,
84		);
85		let retryable_client = create_retryable_http_client(
90			&http_retry_config,
91			(*base_http_client).clone(),
92			Some(TransientErrorRetryStrategy),
93		);
94		for rpc_url in rpc_urls.iter() {
95			let url = match Url::parse(rpc_url.url.as_ref()) {
96				Ok(url) => url,
97				Err(_) => continue,
98			};
99			let test_request = if let Some(test_payload) = &test_connection_payload {
100				serde_json::from_str(test_payload)
101					.context("Failed to parse test payload as JSON")?
102			} else {
103				json!({
104					"jsonrpc": "2.0",
105					"id": 1,
106					"method": "net_version",
107					"params": []
108				})
109			};
110			let request_result = retryable_client
112				.post(url.clone())
113				.json(&test_request)
114				.send()
115				.await;
116			match request_result {
117				Ok(response) => {
118					if !response.status().is_success() {
120						continue;
122					}
123					let fallback_urls: Vec<String> = rpc_urls
125						.iter()
126						.filter(|url| url.url != rpc_url.url)
127						.map(|url| url.url.as_ref().to_string())
128						.collect();
129
130					return Ok(Self {
132						client: retryable_client.clone(),
133						endpoint_manager: EndpointManager::new(
134							retryable_client,
135							rpc_url.url.as_ref(),
136							fallback_urls,
137						),
138						test_connection_payload,
139					});
140				}
141				Err(_) => {
142					continue;
144				}
145			}
146		}
147
148		Err(anyhow::anyhow!("All RPC URLs failed to connect"))
149	}
150}
151
152#[async_trait]
153impl BlockchainTransport for HttpTransportClient {
154	async fn get_current_url(&self) -> String {
162		self.endpoint_manager.active_url.read().await.clone()
163	}
164
165	async fn send_raw_request<P>(
183		&self,
184		method: &str,
185		params: Option<P>,
186	) -> Result<Value, TransportError>
187	where
188		P: Into<Value> + Send + Clone + Serialize,
189	{
190		self.endpoint_manager
191			.send_raw_request(self, method, params)
192			.await
193	}
194
195	fn update_endpoint_manager_client(
200		&mut self,
201		client: ClientWithMiddleware,
202	) -> Result<(), anyhow::Error> {
203		self.endpoint_manager.update_client(client);
204		Ok(())
205	}
206}
207
208#[async_trait]
209impl RotatingTransport for HttpTransportClient {
210	async fn try_connect(&self, url: &str) -> Result<(), anyhow::Error> {
221		let url = Url::parse(url).map_err(|_| anyhow::anyhow!("Invalid URL: {}", url))?;
222
223		let test_request = if let Some(test_payload) = &self.test_connection_payload {
224			serde_json::from_str(test_payload).context("Failed to parse test payload as JSON")?
225		} else {
226			json!({
227				"jsonrpc": "2.0",
228				"id": 1,
229				"method": "net_version",
230				"params": []
231			})
232		};
233
234		let request = self.client.post(url.clone()).json(&test_request);
235
236		match request.send().await {
237			Ok(response) => {
238				let status = response.status();
239				if !status.is_success() {
240					Err(anyhow::anyhow!(
241						"Failed to connect to {}: {}",
242						url,
243						status.as_u16()
244					))
245				} else {
246					Ok(())
247				}
248			}
249			Err(e) => Err(anyhow::anyhow!("Failed to connect to {}: {}", url, e)),
250		}
251	}
252
253	async fn update_client(&self, url: &str) -> Result<(), anyhow::Error> {
264		let parsed_url = Url::parse(url).map_err(|_| anyhow::anyhow!("Invalid URL: {}", url))?;
265		let normalized_url = parsed_url.as_str().trim_end_matches('/');
267
268		let mut active_url = self.endpoint_manager.active_url.write().await;
271		*active_url = normalized_url.to_string();
272		Ok(())
273	}
274}