openzeppelin_monitor/utils/monitor/
execution.rs

1//! Execution monitor module
2//!
3//! This module provides functionality to execute monitors against specific block numbers on blockchain networks.
4use crate::{
5	bootstrap::{get_contract_specs, has_active_monitors},
6	models::{BlockChainType, ScriptLanguage},
7	repositories::{
8		MonitorRepositoryTrait, MonitorService, NetworkRepositoryTrait, NetworkService,
9		TriggerRepositoryTrait,
10	},
11	services::{
12		blockchain::{BlockChainClient, ClientPoolTrait},
13		filter::{handle_match, FilterServiceTrait},
14		trigger::TriggerExecutionService,
15	},
16	utils::monitor::MonitorExecutionError,
17};
18use std::{collections::HashMap, path::Path, sync::Arc};
19use tokio::sync::Mutex;
20use tracing::{info, instrument};
21
22/// Configuration for executing a monitor
23///
24/// # Arguments
25///
26/// * `path` - The path to the monitor to execute
27/// * `network_slug` - The network slug to execute the monitor against
28/// * `block_number` - The block number to execute the monitor against
29/// * `monitor_service` - The monitor service to use
30/// * `network_service` - The network service to use
31/// * `filter_service` - The filter service to use
32/// * `trigger_execution_service` - The trigger execution service to use
33/// * `active_monitors_trigger_scripts` - The active monitors trigger scripts to use
34/// * `client_pool` - The client pool to use
35pub struct MonitorExecutionConfig<
36	M: MonitorRepositoryTrait<N, TR>,
37	N: NetworkRepositoryTrait + Send + Sync + 'static,
38	TR: TriggerRepositoryTrait + Send + Sync + 'static,
39	CP: ClientPoolTrait + Send + Sync + 'static,
40	FS: FilterServiceTrait + Send + Sync + 'static,
41> {
42	pub path: String,
43	pub network_slug: Option<String>,
44	pub block_number: Option<u64>,
45	pub monitor_service: Arc<Mutex<MonitorService<M, N, TR>>>,
46	pub network_service: Arc<Mutex<NetworkService<N>>>,
47	pub filter_service: Arc<FS>,
48	pub trigger_execution_service: Arc<TriggerExecutionService<TR>>,
49	pub active_monitors_trigger_scripts: HashMap<String, (ScriptLanguage, String)>,
50	pub client_pool: Arc<CP>,
51}
52pub type ExecutionResult<T> = std::result::Result<T, MonitorExecutionError>;
53
54/// Executes a monitor against a specific block number on a blockchain network.
55///
56/// This function allows testing monitors by running them against historical blocks.
57/// It supports both EVM and Stellar networks, retrieving the block data and applying
58/// the monitor's filters to check for matches.
59///
60/// # Arguments
61///
62/// * `monitor_name` - The name of the monitor to execute
63/// * `network_slug` - The network identifier to run the monitor against
64/// * `block_number` - The specific block number to analyze
65/// * `active_monitors` - List of currently active monitors
66/// * `network_service` - The network service to use
67/// * `filter_service` - The filter service to use
68/// * `client_pool` - The client pool to use
69///
70/// # Returns
71/// * `Result<String, ExecutionError>` - JSON string containing matches or error
72#[instrument(skip_all)]
73#[allow(clippy::too_many_arguments)]
74pub async fn execute_monitor<
75	M: MonitorRepositoryTrait<N, TR>,
76	N: NetworkRepositoryTrait + Send + Sync + 'static,
77	TR: TriggerRepositoryTrait + Send + Sync + 'static,
78	CP: ClientPoolTrait + Send + Sync + 'static,
79	FS: FilterServiceTrait + Send + Sync + 'static,
80>(
81	config: MonitorExecutionConfig<M, N, TR, CP, FS>,
82) -> ExecutionResult<String> {
83	tracing::debug!("Loading monitor configuration");
84	let monitor = config
85		.monitor_service
86		.lock()
87		.await
88		.load_from_path(Some(Path::new(&config.path)), None, None)
89		.await
90		.map_err(|e| MonitorExecutionError::execution_error(e.to_string(), None, None))?;
91
92	tracing::debug!(monitor_name = %monitor.name, "Monitor loaded successfully");
93
94	let networks_for_monitor = if let Some(network_slug) = config.network_slug {
95		tracing::debug!(network = %network_slug, "Finding specific network");
96		let network = config
97			.network_service
98			.lock()
99			.await
100			.get(network_slug.as_str())
101			.ok_or_else(|| {
102				MonitorExecutionError::not_found(
103					format!("Network '{}' not found", network_slug),
104					None,
105					None,
106				)
107			})?;
108		vec![network.clone()]
109	} else {
110		tracing::debug!("Finding all active networks for monitor");
111		config
112			.network_service
113			.lock()
114			.await
115			.get_all()
116			.values()
117			.filter(|network| has_active_monitors(std::slice::from_ref(&monitor), &network.slug))
118			.cloned()
119			.collect()
120	};
121
122	tracing::debug!(
123		networks_count = networks_for_monitor.len(),
124		"Networks found for monitor"
125	);
126
127	let mut all_matches = Vec::new();
128	for network in networks_for_monitor {
129		tracing::debug!(
130			network_type = ?network.network_type,
131			network_slug = %network.slug,
132			"Processing network"
133		);
134
135		let contract_specs = get_contract_specs(
136			&config.client_pool,
137			&[(network.clone(), vec![monitor.clone()])],
138		)
139		.await;
140
141		let matches = match network.network_type {
142			BlockChainType::EVM => {
143				let client = config
144					.client_pool
145					.get_evm_client(&network)
146					.await
147					.map_err(|e| {
148						MonitorExecutionError::execution_error(
149							format!("Failed to get EVM client: {}", e),
150							None,
151							None,
152						)
153					})?;
154
155				let block_number = match config.block_number {
156					Some(block_number) => {
157						tracing::debug!(block = %block_number, "Using specified block number");
158						block_number
159					}
160					None => {
161						let latest = client.get_latest_block_number().await.map_err(|e| {
162							MonitorExecutionError::execution_error(e.to_string(), None, None)
163						})?;
164						tracing::debug!(block = %latest, "Using latest block number");
165						latest
166					}
167				};
168
169				tracing::debug!(block = %block_number, "Fetching block");
170				let blocks = client.get_blocks(block_number, None).await.map_err(|e| {
171					MonitorExecutionError::execution_error(
172						format!("Failed to get block {}: {}", block_number, e),
173						None,
174						None,
175					)
176				})?;
177
178				let block = blocks.first().ok_or_else(|| {
179					MonitorExecutionError::not_found(
180						format!("Block {} not found", block_number),
181						None,
182						None,
183					)
184				})?;
185
186				tracing::debug!(block = %block_number, "Filtering block");
187				config
188					.filter_service
189					.filter_block(
190						&*client,
191						&network,
192						block,
193						std::slice::from_ref(&monitor),
194						Some(&contract_specs),
195					)
196					.await
197					.map_err(|e| {
198						MonitorExecutionError::execution_error(
199							format!("Failed to filter block: {}", e),
200							None,
201							None,
202						)
203					})?
204			}
205			BlockChainType::Stellar => {
206				let client = config
207					.client_pool
208					.get_stellar_client(&network)
209					.await
210					.map_err(|e| {
211						MonitorExecutionError::execution_error(
212							format!("Failed to get Stellar client: {}", e),
213							None,
214							None,
215						)
216					})?;
217
218				// If block number is not provided, get the latest block number
219				let block_number = match config.block_number {
220					Some(block_number) => block_number,
221					None => client.get_latest_block_number().await.map_err(|e| {
222						MonitorExecutionError::execution_error(e.to_string(), None, None)
223					})?,
224				};
225
226				let blocks = client.get_blocks(block_number, None).await.map_err(|e| {
227					MonitorExecutionError::execution_error(
228						format!("Failed to get block {}: {}", block_number, e),
229						None,
230						None,
231					)
232				})?;
233
234				let block = blocks.first().ok_or_else(|| {
235					MonitorExecutionError::not_found(
236						format!("Block {} not found", block_number),
237						None,
238						None,
239					)
240				})?;
241
242				config
243					.filter_service
244					.filter_block(
245						&*client,
246						&network,
247						block,
248						std::slice::from_ref(&monitor),
249						Some(&contract_specs),
250					)
251					.await
252					.map_err(|e| {
253						MonitorExecutionError::execution_error(
254							format!("Failed to filter block: {}", e),
255							None,
256							None,
257						)
258					})?
259			}
260			BlockChainType::Midnight => {
261				let client = config
262					.client_pool
263					.get_midnight_client(&network)
264					.await
265					.map_err(|e| {
266						MonitorExecutionError::execution_error(
267							format!("Failed to get Midnight client: {}", e),
268							None,
269							None,
270						)
271					})?;
272
273				// If block number is not provided, get the latest block number
274				let block_number = match config.block_number {
275					Some(block_number) => block_number,
276					None => client.get_latest_block_number().await.map_err(|e| {
277						MonitorExecutionError::execution_error(e.to_string(), None, None)
278					})?,
279				};
280
281				let blocks = client.get_blocks(block_number, None).await.map_err(|e| {
282					MonitorExecutionError::execution_error(
283						format!("Failed to get block {}: {}", block_number, e),
284						None,
285						None,
286					)
287				})?;
288
289				let block = blocks.first().ok_or_else(|| {
290					MonitorExecutionError::not_found(
291						format!("Block {} not found", block_number),
292						None,
293						None,
294					)
295				})?;
296
297				config
298					.filter_service
299					.filter_block(
300						&*client,
301						&network,
302						block,
303						std::slice::from_ref(&monitor),
304						Some(&contract_specs),
305					)
306					.await
307					.map_err(|e| {
308						MonitorExecutionError::execution_error(
309							format!("Failed to filter block: {}", e),
310							None,
311							None,
312						)
313					})?
314			}
315		};
316
317		tracing::debug!(matches_count = matches.len(), "Found matches for network");
318		all_matches.extend(matches);
319	}
320
321	// Send notifications for each match
322	for match_result in all_matches.clone() {
323		let result = handle_match(
324			match_result,
325			&*config.trigger_execution_service,
326			&config.active_monitors_trigger_scripts,
327		)
328		.await;
329		match result {
330			Ok(_result) => info!("Successfully sent notifications for match"),
331			Err(e) => {
332				tracing::error!("Error sending notifications: {}", e);
333				continue;
334			}
335		};
336	}
337
338	tracing::debug!(total_matches = all_matches.len(), "Serializing results");
339	let json_matches = serde_json::to_string(&all_matches).map_err(|e| {
340		MonitorExecutionError::execution_error(
341			format!("Failed to serialize matches: {}", e),
342			None,
343			None,
344		)
345	})?;
346
347	tracing::debug!("Monitor execution completed successfully");
348	Ok(json_matches)
349}