openzeppelin_monitor/utils/monitor/
execution.rs1use crate::{
5 bootstrap::{get_contract_specs, has_active_monitors},
6 models::{BlockChainType, ScriptLanguage},
7 repositories::{
8 MonitorRepositoryTrait, MonitorService, NetworkRepositoryTrait, NetworkService,
9 TriggerRepositoryTrait,
10 },
11 services::{
12 blockchain::{BlockChainClient, ClientPoolTrait},
13 filter::{handle_match, FilterServiceTrait},
14 trigger::TriggerExecutionService,
15 },
16 utils::monitor::MonitorExecutionError,
17};
18use std::{collections::HashMap, path::Path, sync::Arc};
19use tokio::sync::Mutex;
20use tracing::{info, instrument};
21
22pub struct MonitorExecutionConfig<
36 M: MonitorRepositoryTrait<N, TR>,
37 N: NetworkRepositoryTrait + Send + Sync + 'static,
38 TR: TriggerRepositoryTrait + Send + Sync + 'static,
39 CP: ClientPoolTrait + Send + Sync + 'static,
40 FS: FilterServiceTrait + Send + Sync + 'static,
41> {
42 pub path: String,
43 pub network_slug: Option<String>,
44 pub block_number: Option<u64>,
45 pub monitor_service: Arc<Mutex<MonitorService<M, N, TR>>>,
46 pub network_service: Arc<Mutex<NetworkService<N>>>,
47 pub filter_service: Arc<FS>,
48 pub trigger_execution_service: Arc<TriggerExecutionService<TR>>,
49 pub active_monitors_trigger_scripts: HashMap<String, (ScriptLanguage, String)>,
50 pub client_pool: Arc<CP>,
51}
52pub type ExecutionResult<T> = std::result::Result<T, MonitorExecutionError>;
53
54#[instrument(skip_all)]
73#[allow(clippy::too_many_arguments)]
74pub async fn execute_monitor<
75 M: MonitorRepositoryTrait<N, TR>,
76 N: NetworkRepositoryTrait + Send + Sync + 'static,
77 TR: TriggerRepositoryTrait + Send + Sync + 'static,
78 CP: ClientPoolTrait + Send + Sync + 'static,
79 FS: FilterServiceTrait + Send + Sync + 'static,
80>(
81 config: MonitorExecutionConfig<M, N, TR, CP, FS>,
82) -> ExecutionResult<String> {
83 tracing::debug!("Loading monitor configuration");
84 let monitor = config
85 .monitor_service
86 .lock()
87 .await
88 .load_from_path(Some(Path::new(&config.path)), None, None)
89 .await
90 .map_err(|e| MonitorExecutionError::execution_error(e.to_string(), None, None))?;
91
92 tracing::debug!(monitor_name = %monitor.name, "Monitor loaded successfully");
93
94 let networks_for_monitor = if let Some(network_slug) = config.network_slug {
95 tracing::debug!(network = %network_slug, "Finding specific network");
96 let network = config
97 .network_service
98 .lock()
99 .await
100 .get(network_slug.as_str())
101 .ok_or_else(|| {
102 MonitorExecutionError::not_found(
103 format!("Network '{}' not found", network_slug),
104 None,
105 None,
106 )
107 })?;
108 vec![network.clone()]
109 } else {
110 tracing::debug!("Finding all active networks for monitor");
111 config
112 .network_service
113 .lock()
114 .await
115 .get_all()
116 .values()
117 .filter(|network| has_active_monitors(std::slice::from_ref(&monitor), &network.slug))
118 .cloned()
119 .collect()
120 };
121
122 tracing::debug!(
123 networks_count = networks_for_monitor.len(),
124 "Networks found for monitor"
125 );
126
127 let mut all_matches = Vec::new();
128 for network in networks_for_monitor {
129 tracing::debug!(
130 network_type = ?network.network_type,
131 network_slug = %network.slug,
132 "Processing network"
133 );
134
135 let contract_specs = get_contract_specs(
136 &config.client_pool,
137 &[(network.clone(), vec![monitor.clone()])],
138 )
139 .await;
140
141 let matches = match network.network_type {
142 BlockChainType::EVM => {
143 let client = config
144 .client_pool
145 .get_evm_client(&network)
146 .await
147 .map_err(|e| {
148 MonitorExecutionError::execution_error(
149 format!("Failed to get EVM client: {}", e),
150 None,
151 None,
152 )
153 })?;
154
155 let block_number = match config.block_number {
156 Some(block_number) => {
157 tracing::debug!(block = %block_number, "Using specified block number");
158 block_number
159 }
160 None => {
161 let latest = client.get_latest_block_number().await.map_err(|e| {
162 MonitorExecutionError::execution_error(e.to_string(), None, None)
163 })?;
164 tracing::debug!(block = %latest, "Using latest block number");
165 latest
166 }
167 };
168
169 tracing::debug!(block = %block_number, "Fetching block");
170 let blocks = client.get_blocks(block_number, None).await.map_err(|e| {
171 MonitorExecutionError::execution_error(
172 format!("Failed to get block {}: {}", block_number, e),
173 None,
174 None,
175 )
176 })?;
177
178 let block = blocks.first().ok_or_else(|| {
179 MonitorExecutionError::not_found(
180 format!("Block {} not found", block_number),
181 None,
182 None,
183 )
184 })?;
185
186 tracing::debug!(block = %block_number, "Filtering block");
187 config
188 .filter_service
189 .filter_block(
190 &*client,
191 &network,
192 block,
193 std::slice::from_ref(&monitor),
194 Some(&contract_specs),
195 )
196 .await
197 .map_err(|e| {
198 MonitorExecutionError::execution_error(
199 format!("Failed to filter block: {}", e),
200 None,
201 None,
202 )
203 })?
204 }
205 BlockChainType::Stellar => {
206 let client = config
207 .client_pool
208 .get_stellar_client(&network)
209 .await
210 .map_err(|e| {
211 MonitorExecutionError::execution_error(
212 format!("Failed to get Stellar client: {}", e),
213 None,
214 None,
215 )
216 })?;
217
218 let block_number = match config.block_number {
220 Some(block_number) => block_number,
221 None => client.get_latest_block_number().await.map_err(|e| {
222 MonitorExecutionError::execution_error(e.to_string(), None, None)
223 })?,
224 };
225
226 let blocks = client.get_blocks(block_number, None).await.map_err(|e| {
227 MonitorExecutionError::execution_error(
228 format!("Failed to get block {}: {}", block_number, e),
229 None,
230 None,
231 )
232 })?;
233
234 let block = blocks.first().ok_or_else(|| {
235 MonitorExecutionError::not_found(
236 format!("Block {} not found", block_number),
237 None,
238 None,
239 )
240 })?;
241
242 config
243 .filter_service
244 .filter_block(
245 &*client,
246 &network,
247 block,
248 std::slice::from_ref(&monitor),
249 Some(&contract_specs),
250 )
251 .await
252 .map_err(|e| {
253 MonitorExecutionError::execution_error(
254 format!("Failed to filter block: {}", e),
255 None,
256 None,
257 )
258 })?
259 }
260 BlockChainType::Midnight => {
261 let client = config
262 .client_pool
263 .get_midnight_client(&network)
264 .await
265 .map_err(|e| {
266 MonitorExecutionError::execution_error(
267 format!("Failed to get Midnight client: {}", e),
268 None,
269 None,
270 )
271 })?;
272
273 let block_number = match config.block_number {
275 Some(block_number) => block_number,
276 None => client.get_latest_block_number().await.map_err(|e| {
277 MonitorExecutionError::execution_error(e.to_string(), None, None)
278 })?,
279 };
280
281 let blocks = client.get_blocks(block_number, None).await.map_err(|e| {
282 MonitorExecutionError::execution_error(
283 format!("Failed to get block {}: {}", block_number, e),
284 None,
285 None,
286 )
287 })?;
288
289 let block = blocks.first().ok_or_else(|| {
290 MonitorExecutionError::not_found(
291 format!("Block {} not found", block_number),
292 None,
293 None,
294 )
295 })?;
296
297 config
298 .filter_service
299 .filter_block(
300 &*client,
301 &network,
302 block,
303 std::slice::from_ref(&monitor),
304 Some(&contract_specs),
305 )
306 .await
307 .map_err(|e| {
308 MonitorExecutionError::execution_error(
309 format!("Failed to filter block: {}", e),
310 None,
311 None,
312 )
313 })?
314 }
315 };
316
317 tracing::debug!(matches_count = matches.len(), "Found matches for network");
318 all_matches.extend(matches);
319 }
320
321 for match_result in all_matches.clone() {
323 let result = handle_match(
324 match_result,
325 &*config.trigger_execution_service,
326 &config.active_monitors_trigger_scripts,
327 )
328 .await;
329 match result {
330 Ok(_result) => info!("Successfully sent notifications for match"),
331 Err(e) => {
332 tracing::error!("Error sending notifications: {}", e);
333 continue;
334 }
335 };
336 }
337
338 tracing::debug!(total_matches = all_matches.len(), "Serializing results");
339 let json_matches = serde_json::to_string(&all_matches).map_err(|e| {
340 MonitorExecutionError::execution_error(
341 format!("Failed to serialize matches: {}", e),
342 None,
343 None,
344 )
345 })?;
346
347 tracing::debug!("Monitor execution completed successfully");
348 Ok(json_matches)
349}