openzeppelin_monitor/services/blockchain/transports/http/
transport.rs

1//! HTTP transport implementation for blockchain interactions.
2//!
3//! This module provides a generic HTTP client implementation for interacting with blockchain nodes
4//! via JSON-RPC, supporting:
5//! - Multiple RPC endpoints with automatic failover
6//! - Configurable retry policies
7//! - Authentication via bearer tokens
8//! - Connection health checks
9//! - Endpoint rotation for high availability
10
11use anyhow::Context;
12use async_trait::async_trait;
13use reqwest_middleware::ClientWithMiddleware;
14use serde::Serialize;
15use serde_json::{json, Value};
16use std::{sync::Arc, time::Duration};
17use url::Url;
18
19use crate::{
20	models::Network,
21	services::blockchain::transports::{
22		http::endpoint_manager::EndpointManager, BlockchainTransport, RotatingTransport,
23		TransientErrorRetryStrategy, TransportError,
24	},
25	utils::http::{create_retryable_http_client, RetryConfig},
26};
27
28/// Basic HTTP transport client for blockchain interactions
29///
30/// This client provides a foundation for making JSON-RPC requests to blockchain nodes
31/// with built-in support for:
32/// - Connection pooling and reuse
33/// - Automatic endpoint rotation on failure
34/// - Configurable retry policies
35///
36/// The client is thread-safe and can be shared across multiple tasks.
37#[derive(Clone, Debug)]
38pub struct HttpTransportClient {
39	/// Retryable HTTP client for making requests
40	pub client: ClientWithMiddleware,
41	/// Manages RPC endpoint rotation and request handling for high availability
42	endpoint_manager: EndpointManager,
43	/// The stringified JSON RPC payload to use for testing the connection
44	test_connection_payload: Option<String>,
45}
46
47impl HttpTransportClient {
48	/// Creates a new HTTP transport client with automatic endpoint management
49	///
50	/// This constructor attempts to connect to available endpoints in order of their
51	/// weight until a successful connection is established. It configures default
52	/// timeout and retry policies suitable for blockchain interactions.
53	///
54	/// # Arguments
55	/// * `network` - Network configuration containing RPC URLs, weights, and other details
56	/// * `test_connection_payload` - Optional JSON RPC payload to test the connection (default is net_version)
57	///
58	/// # Returns
59	/// * `Result<Self, anyhow::Error>` - New client instance or connection error
60	pub async fn new(
61		network: &Network,
62		test_connection_payload: Option<String>,
63	) -> Result<Self, anyhow::Error> {
64		let mut rpc_urls: Vec<_> = network
65			.rpc_urls
66			.iter()
67			.filter(|rpc_url| rpc_url.type_ == "rpc" && rpc_url.weight > 0)
68			.collect();
69
70		rpc_urls.sort_by(|a, b| b.weight.cmp(&a.weight));
71		// Create a retry policy with default settings
72		// Shared config for endpoint manager and test connection
73		let http_retry_config = RetryConfig::default();
74		// Create the base HTTP client
75		let base_http_client = Arc::new(
76			reqwest::ClientBuilder::new()
77				.pool_idle_timeout(Duration::from_secs(90))
78				.pool_max_idle_per_host(32)
79				.timeout(Duration::from_secs(30))
80				.connect_timeout(Duration::from_secs(20))
81				.use_rustls_tls()
82				.build()
83				.context("Failed to create base HTTP client")?,
84		);
85		// Create a retryable HTTP client with the base client and retry policy
86		// Shared across:
87		// - EndpointManager for handling endpoint rotation
88		// - Connection testing for verifying endpoint availability
89		let retryable_client = create_retryable_http_client(
90			&http_retry_config,
91			(*base_http_client).clone(),
92			Some(TransientErrorRetryStrategy),
93		);
94		for rpc_url in rpc_urls.iter() {
95			let url = match Url::parse(rpc_url.url.as_ref()) {
96				Ok(url) => url,
97				Err(_) => continue,
98			};
99			let test_request = if let Some(test_payload) = &test_connection_payload {
100				serde_json::from_str(test_payload)
101					.context("Failed to parse test payload as JSON")?
102			} else {
103				json!({
104					"jsonrpc": "2.0",
105					"id": 1,
106					"method": "net_version",
107					"params": []
108				})
109			};
110			// Attempt to connect to the endpoint
111			let request_result = retryable_client
112				.post(url.clone())
113				.json(&test_request)
114				.send()
115				.await;
116			match request_result {
117				Ok(response) => {
118					// Check if the response indicates an error status (4xx or 5xx)
119					if !response.status().is_success() {
120						// Skip this URL if we got an error status
121						continue;
122					}
123					// Create list of fallback URLs (all URLs except the current one)
124					let fallback_urls: Vec<String> = rpc_urls
125						.iter()
126						.filter(|url| url.url != rpc_url.url)
127						.map(|url| url.url.as_ref().to_string())
128						.collect();
129
130					// Successfully connected - create and return the client
131					return Ok(Self {
132						client: retryable_client.clone(),
133						endpoint_manager: EndpointManager::new(
134							retryable_client,
135							rpc_url.url.as_ref(),
136							fallback_urls,
137						),
138						test_connection_payload,
139					});
140				}
141				Err(_) => {
142					// Connection failed - try next URL
143					continue;
144				}
145			}
146		}
147
148		Err(anyhow::anyhow!("All RPC URLs failed to connect"))
149	}
150}
151
152#[async_trait]
153impl BlockchainTransport for HttpTransportClient {
154	/// Retrieves the currently active RPC endpoint URL
155	///
156	/// This method is useful for monitoring which endpoint is currently in use,
157	/// especially in scenarios with multiple failover URLs.
158	///
159	/// # Returns
160	/// * `String` - The URL of the currently active endpoint
161	async fn get_current_url(&self) -> String {
162		self.endpoint_manager.active_url.read().await.clone()
163	}
164
165	/// Sends a JSON-RPC request to the blockchain node
166	///
167	/// This method handles the formatting of the JSON-RPC request, including:
168	/// - Adding required JSON-RPC 2.0 fields
169	/// - Generating unique request IDs
170	/// - Converting parameters to the correct format
171	/// - Handling authentication
172	///
173	/// # Arguments
174	/// * `method` - The JSON-RPC method name to call
175	/// * `params` - Optional parameters for the method call
176	///
177	/// # Returns
178	/// * `Result<Value, TransportError>` - JSON response or error with context
179	///
180	/// # Type Parameters
181	/// * `P` - Parameter type that can be serialized to JSON
182	async fn send_raw_request<P>(
183		&self,
184		method: &str,
185		params: Option<P>,
186	) -> Result<Value, TransportError>
187	where
188		P: Into<Value> + Send + Clone + Serialize,
189	{
190		self.endpoint_manager
191			.send_raw_request(self, method, params)
192			.await
193	}
194
195	/// Update endpoint manager with a new client
196	///
197	/// # Arguments
198	/// * `client` - The new client to use for the endpoint manager
199	fn update_endpoint_manager_client(
200		&mut self,
201		client: ClientWithMiddleware,
202	) -> Result<(), anyhow::Error> {
203		self.endpoint_manager.update_client(client);
204		Ok(())
205	}
206}
207
208#[async_trait]
209impl RotatingTransport for HttpTransportClient {
210	/// Tests connectivity to a specific RPC endpoint
211	///
212	/// Performs a basic JSON-RPC request to verify the endpoint is responsive
213	/// and correctly handling requests.
214	///
215	/// # Arguments
216	/// * `url` - The URL to test
217	///
218	/// # Returns
219	/// * `Result<(), anyhow::Error>` - Success or detailed error message
220	async fn try_connect(&self, url: &str) -> Result<(), anyhow::Error> {
221		let url = Url::parse(url).map_err(|_| anyhow::anyhow!("Invalid URL: {}", url))?;
222
223		let test_request = if let Some(test_payload) = &self.test_connection_payload {
224			serde_json::from_str(test_payload).context("Failed to parse test payload as JSON")?
225		} else {
226			json!({
227				"jsonrpc": "2.0",
228				"id": 1,
229				"method": "net_version",
230				"params": []
231			})
232		};
233
234		let request = self.client.post(url.clone()).json(&test_request);
235
236		match request.send().await {
237			Ok(response) => {
238				let status = response.status();
239				if !status.is_success() {
240					Err(anyhow::anyhow!(
241						"Failed to connect to {}: {}",
242						url,
243						status.as_u16()
244					))
245				} else {
246					Ok(())
247				}
248			}
249			Err(e) => Err(anyhow::anyhow!("Failed to connect to {}: {}", url, e)),
250		}
251	}
252
253	/// Updates the active endpoint URL
254	///
255	/// This method is called when rotating to a new endpoint, typically
256	/// after a failure of the current endpoint.
257	///
258	/// # Arguments
259	/// * `url` - The new URL to use for subsequent requests
260	///
261	/// # Returns
262	/// * `Result<(), anyhow::Error>` - Success or error status
263	async fn update_client(&self, url: &str) -> Result<(), anyhow::Error> {
264		let parsed_url = Url::parse(url).map_err(|_| anyhow::anyhow!("Invalid URL: {}", url))?;
265		// Normalize the URL by trimming trailing slash if present
266		let normalized_url = parsed_url.as_str().trim_end_matches('/');
267
268		// For HTTP client, we don't need to update the client itself
269		// We just need to update the endpoint manager's active URL
270		let mut active_url = self.endpoint_manager.active_url.write().await;
271		*active_url = normalized_url.to_string();
272		Ok(())
273	}
274}