openzeppelin_monitor/bootstrap/
mod.rs

1//! Bootstrap module for initializing services and creating handlers.
2//!
3//! This module provides functions to initialize the necessary services and create handlers for
4//! processing blocks and triggers. It also includes helper functions for filtering and processing
5//! monitors and networks.
6//!
7//! # Services
8//! - `FilterService`: Handles filtering of blockchain data
9//! - `TriggerExecutionService`: Manages trigger execution
10//! - `NotificationService`: Handles notifications
11//!
12//! # Handlers
13//! - `create_block_handler`: Creates a block handler function that processes new blocks from the
14//!   blockchain
15//! - `create_trigger_handler`: Creates a trigger handler function that processes trigger events
16//!   from the block processing pipeline
17
18use futures::future::BoxFuture;
19use std::{collections::HashMap, error::Error, sync::Arc};
20use tokio::sync::{watch, Mutex};
21
22use crate::{
23	models::{
24		BlockChainType, BlockType, ContractSpec, Monitor, MonitorMatch, Network, ProcessedBlock,
25		ScriptLanguage, TriggerConditions,
26	},
27	repositories::{
28		MonitorRepositoryTrait, MonitorService, NetworkRepositoryTrait, NetworkService,
29		TriggerRepositoryTrait, TriggerService,
30	},
31	services::{
32		blockchain::{BlockChainClient, BlockFilterFactory, ClientPoolTrait},
33		filter::{evm_helpers, handle_match, stellar_helpers, FilterService},
34		notification::NotificationService,
35		trigger::{
36			ScriptError, ScriptExecutorFactory, TriggerError, TriggerExecutionService,
37			TriggerExecutionServiceTrait,
38		},
39	},
40	utils::normalize_string,
41};
42
43/// Type alias for handling ServiceResult
44pub type Result<T> = std::result::Result<T, Box<dyn Error>>;
45
46type ServiceResult<M, N, T> = Result<(
47	Arc<FilterService>,
48	Arc<TriggerExecutionService<T>>,
49	Vec<Monitor>,
50	HashMap<String, Network>,
51	Arc<Mutex<MonitorService<M, N, T>>>,
52	Arc<Mutex<NetworkService<N>>>,
53	Arc<Mutex<TriggerService<T>>>,
54)>;
55
56/// Initializes all required services for the blockchain monitor.
57///
58/// # Returns
59/// Returns a tuple containing:
60/// - FilterService: Handles filtering of blockchain data
61/// - TriggerExecutionService: Manages trigger execution
62/// - `Vec<Monitor>`: List of active monitors
63/// - `HashMap<String, Network>`: Available networks indexed by slug
64/// - `Arc<Mutex<M>>`: Data access for monitor configs
65/// - `Arc<Mutex<N>>`: Data access for network configs
66/// - `Arc<Mutex<T>>`: Data access for trigger configs
67/// # Errors
68/// Returns an error if any service initialization fails
69pub async fn initialize_services<M, N, T>(
70	monitor_service: Option<MonitorService<M, N, T>>,
71	network_service: Option<NetworkService<N>>,
72	trigger_service: Option<TriggerService<T>>,
73) -> ServiceResult<M, N, T>
74where
75	M: MonitorRepositoryTrait<N, T> + Send + Sync + 'static,
76	N: NetworkRepositoryTrait + Send + Sync + 'static,
77	T: TriggerRepositoryTrait + Send + Sync + 'static,
78{
79	let network_service = match network_service {
80		Some(service) => service,
81		None => {
82			let repository = N::new(None).await?;
83			NetworkService::<N>::new_with_repository(repository)?
84		}
85	};
86
87	let trigger_service = match trigger_service {
88		Some(service) => service,
89		None => {
90			let repository = T::new(None).await?;
91			TriggerService::<T>::new_with_repository(repository)?
92		}
93	};
94
95	let monitor_service = match monitor_service {
96		Some(service) => service,
97		None => {
98			let repository = M::new(
99				None,
100				Some(network_service.clone()),
101				Some(trigger_service.clone()),
102			)
103			.await?;
104			MonitorService::<M, N, T>::new_with_repository(repository)?
105		}
106	};
107
108	let notification_service = NotificationService::new();
109
110	let filter_service = Arc::new(FilterService::new());
111	let trigger_execution_service = Arc::new(TriggerExecutionService::new(
112		trigger_service.clone(),
113		notification_service,
114	));
115
116	let monitors = monitor_service.get_all();
117	let active_monitors = filter_active_monitors(monitors);
118	let networks = network_service.get_all();
119
120	Ok((
121		filter_service,
122		trigger_execution_service,
123		active_monitors,
124		networks,
125		Arc::new(Mutex::new(monitor_service)),
126		Arc::new(Mutex::new(network_service)),
127		Arc::new(Mutex::new(trigger_service)),
128	))
129}
130
131/// Creates a block handler function that processes new blocks from the blockchain.
132///
133/// # Arguments
134/// * `shutdown_tx` - Watch channel for shutdown signals
135/// * `filter_service` - Service for filtering blockchain data
136/// * `active_monitors` - List of active monitors
137/// * `client_pools` - Client pools for accessing blockchain clients
138///
139/// # Returns
140/// Returns a function that handles incoming blocks
141pub fn create_block_handler<P: ClientPoolTrait + 'static>(
142	shutdown_tx: watch::Sender<bool>,
143	filter_service: Arc<FilterService>,
144	active_monitors: Vec<Monitor>,
145	client_pools: Arc<P>,
146	contract_specs: Vec<(String, ContractSpec)>,
147) -> Arc<impl Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync> {
148	Arc::new(
149		move |block: BlockType, network: Network| -> BoxFuture<'static, ProcessedBlock> {
150			let filter_service = filter_service.clone();
151			let active_monitors = active_monitors.clone();
152			let client_pools = client_pools.clone();
153			let shutdown_tx = shutdown_tx.clone();
154			let contract_specs = contract_specs.clone();
155			Box::pin(async move {
156				let applicable_monitors = filter_network_monitors(&active_monitors, &network.slug);
157
158				let mut processed_block = ProcessedBlock {
159					block_number: block.number().unwrap_or(0),
160					network_slug: network.slug.clone(),
161					processing_results: Vec::new(),
162				};
163
164				if !applicable_monitors.is_empty() {
165					let mut shutdown_rx = shutdown_tx.subscribe();
166
167					let matches = match network.network_type {
168						BlockChainType::EVM => match client_pools.get_evm_client(&network).await {
169							Ok(client) => {
170								process_block(
171									client.as_ref(),
172									&network,
173									&block,
174									&applicable_monitors,
175									Some(&contract_specs),
176									&filter_service,
177									&mut shutdown_rx,
178								)
179								.await
180							}
181							Err(_) => None,
182						},
183						BlockChainType::Stellar => {
184							match client_pools.get_stellar_client(&network).await {
185								Ok(client) => {
186									process_block(
187										client.as_ref(),
188										&network,
189										&block,
190										&applicable_monitors,
191										Some(&contract_specs),
192										&filter_service,
193										&mut shutdown_rx,
194									)
195									.await
196								}
197								Err(_) => None,
198							}
199						}
200						BlockChainType::Midnight => {
201							match client_pools.get_midnight_client(&network).await {
202								Ok(client) => {
203									process_block(
204										client.as_ref(),
205										&network,
206										&block,
207										&applicable_monitors,
208										Some(&contract_specs),
209										&filter_service,
210										&mut shutdown_rx,
211									)
212									.await
213								}
214								Err(_) => None,
215							}
216						}
217					};
218
219					processed_block.processing_results = matches.unwrap_or_default();
220				}
221
222				processed_block
223			})
224		},
225	)
226}
227
228/// Processes a single block for all applicable monitors.
229///
230/// # Arguments
231/// * `client` - The client to use to process the block
232/// * `network` - The network the block belongs to
233/// * `block` - The block to process
234/// * `applicable_monitors` - List of monitors that apply to this network
235/// * `filter_service` - Service for filtering blockchain data
236/// * `shutdown_rx` - Receiver for shutdown signals
237pub async fn process_block<T>(
238	client: &T,
239	network: &Network,
240	block: &BlockType,
241	applicable_monitors: &[Monitor],
242	contract_specs: Option<&[(String, ContractSpec)]>,
243	filter_service: &FilterService,
244	shutdown_rx: &mut watch::Receiver<bool>,
245) -> Option<Vec<MonitorMatch>>
246where
247	T: BlockChainClient + BlockFilterFactory<T>,
248{
249	tokio::select! {
250		result = filter_service.filter_block(client, network, block, applicable_monitors, contract_specs) => {
251			result.ok()
252		}
253		_ = shutdown_rx.changed() => {
254			tracing::info!("Shutting down block processing task");
255			None
256		}
257	}
258}
259
260/// Get contract specs for all applicable monitors
261///
262/// # Arguments
263/// * `client_pool` - The client pool to use to get the contract specs
264/// * `network_monitors` - The monitors to get the contract specs for
265///
266/// # Returns
267/// Returns a vector of contract specs
268pub async fn get_contract_specs<P: ClientPoolTrait + 'static>(
269	client_pool: &Arc<P>,
270	network_monitors: &[(Network, Vec<Monitor>)],
271) -> Vec<(String, ContractSpec)> {
272	let mut all_specs = Vec::new();
273
274	for (network, monitors) in network_monitors {
275		for monitor in monitors {
276			let specs = match network.network_type {
277				BlockChainType::Stellar => {
278					let mut contract_specs = Vec::new();
279					let mut addresses_without_specs = Vec::new();
280					// First collect addresses that have contract specs configured in the monitor
281					for monitored_addr in &monitor.addresses {
282						if let Some(spec) = &monitored_addr.contract_spec {
283							let parsed_spec = match spec {
284								ContractSpec::Stellar(spec) => spec,
285								_ => {
286									tracing::warn!(
287										"Skipping non-Stellar contract spec for address {}",
288										monitored_addr.address
289									);
290									continue;
291								}
292							};
293
294							contract_specs.push((
295								stellar_helpers::normalize_address(&monitored_addr.address),
296								ContractSpec::Stellar(parsed_spec.clone()),
297							))
298						} else {
299							addresses_without_specs.push(monitored_addr.address.clone());
300						}
301					}
302
303					// Fetch remaining specs from chain
304					if !addresses_without_specs.is_empty() {
305						// Get the client once
306						let client: Arc<P::StellarClient> =
307							match client_pool.get_stellar_client(network).await {
308								Ok(client) => client,
309								Err(_) => {
310									tracing::warn!("Failed to get stellar client");
311									continue;
312								}
313							};
314
315						let chain_specs = futures::future::join_all(
316							addresses_without_specs.iter().map(|address| {
317								let client = client.clone();
318								async move {
319									let spec = client.get_contract_spec(address).await;
320									(address.clone(), spec)
321								}
322							}),
323						)
324						.await
325						.into_iter()
326						.filter_map(|(addr, spec)| match spec {
327							Ok(s) => Some((addr, s)),
328							Err(e) => {
329								tracing::warn!(
330									"Failed to fetch contract spec for address {}: {:?}",
331									addr,
332									e
333								);
334								None
335							}
336						})
337						.collect::<Vec<_>>();
338
339						contract_specs.extend(chain_specs);
340					}
341					contract_specs
342				}
343				BlockChainType::EVM => {
344					let mut contract_specs = Vec::new();
345					// First collect addresses that have contract specs configured in the monitor
346					for monitored_addr in &monitor.addresses {
347						if let Some(spec) = &monitored_addr.contract_spec {
348							let parsed_spec = match spec {
349								ContractSpec::EVM(spec) => spec,
350								_ => {
351									tracing::warn!(
352										"Skipping non-EVM contract spec for address {}",
353										monitored_addr.address
354									);
355									continue;
356								}
357							};
358
359							contract_specs.push((
360								format!(
361									"0x{}",
362									evm_helpers::normalize_address(&monitored_addr.address)
363								),
364								ContractSpec::EVM(parsed_spec.clone()),
365							))
366						}
367					}
368					contract_specs
369				}
370				_ => {
371					vec![]
372				}
373			};
374			all_specs.extend(specs);
375		}
376	}
377	all_specs
378}
379
380/// Creates a trigger handler function that processes trigger events from the block processing
381/// pipeline.
382///
383/// # Arguments
384/// * `shutdown_tx` - Watch channel for shutdown signals
385/// * `trigger_service` - Service for executing triggers
386///
387/// # Returns
388/// Returns a function that handles trigger execution for matching monitors
389pub fn create_trigger_handler<S: TriggerExecutionServiceTrait + Send + Sync + 'static>(
390	shutdown_tx: watch::Sender<bool>,
391	trigger_service: Arc<S>,
392	active_monitors_trigger_scripts: HashMap<String, (ScriptLanguage, String)>,
393) -> Arc<impl Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync> {
394	Arc::new(move |block: &ProcessedBlock| {
395		let mut shutdown_rx = shutdown_tx.subscribe();
396		let trigger_service = trigger_service.clone();
397		let trigger_scripts = active_monitors_trigger_scripts.clone();
398		let block = block.clone();
399
400		tokio::spawn(async move {
401			tokio::select! {
402				_ = async {
403					if block.processing_results.is_empty() {
404						return;
405					}
406					let filtered_matches = run_trigger_filters(&block.processing_results, &block.network_slug, &trigger_scripts).await;
407					for monitor_match in &filtered_matches {
408						if let Err(e) = handle_match(monitor_match.clone(), &*trigger_service, &trigger_scripts).await {
409							TriggerError::execution_error(e.to_string(), Some(e.into()), None);
410						}
411					}
412				} => {}
413				_ = shutdown_rx.changed() => {
414					tracing::info!("Shutting down trigger handling task");
415				}
416			}
417		})
418	})
419}
420
421/// Checks if a network has any active monitors.
422///
423/// # Arguments
424/// * `monitors` - List of monitors to check
425/// * `network_slug` - Network identifier to check for
426///
427/// # Returns
428/// Returns true if there are any active monitors for the given network
429pub fn has_active_monitors(monitors: &[Monitor], network_slug: &String) -> bool {
430	monitors
431		.iter()
432		.any(|m| m.networks.contains(network_slug) && !m.paused)
433}
434
435/// Filters out paused monitors from the provided collection.
436///
437/// # Arguments
438/// * `monitors` - HashMap of monitors to filter
439///
440/// # Returns
441/// Returns a vector containing only active (non-paused) monitors
442fn filter_active_monitors(monitors: HashMap<String, Monitor>) -> Vec<Monitor> {
443	monitors
444		.into_values()
445		.filter(|m| !m.paused)
446		.collect::<Vec<_>>()
447}
448
449/// Filters monitors that are applicable to a specific network.
450///
451/// # Arguments
452/// * `monitors` - List of monitors to filter
453/// * `network_slug` - Network identifier to filter by
454///
455/// # Returns
456/// Returns a vector of monitors that are configured for the specified network
457fn filter_network_monitors(monitors: &[Monitor], network_slug: &String) -> Vec<Monitor> {
458	monitors
459		.iter()
460		.filter(|m| m.networks.contains(network_slug))
461		.cloned()
462		.collect()
463}
464
465async fn execute_trigger_condition(
466	trigger_condition: &TriggerConditions,
467	monitor_match: &MonitorMatch,
468	script_content: &(ScriptLanguage, String),
469) -> bool {
470	let executor = ScriptExecutorFactory::create(&script_content.0, &script_content.1);
471
472	let result = executor
473		.execute(
474			monitor_match.clone(),
475			&trigger_condition.timeout_ms,
476			trigger_condition.arguments.as_deref(),
477			false,
478		)
479		.await;
480
481	match result {
482		Ok(true) => true,
483		Err(e) => {
484			ScriptError::execution_error(e.to_string(), None, None);
485			false
486		}
487		_ => false,
488	}
489}
490
491async fn run_trigger_filters(
492	matches: &[MonitorMatch],
493	_network: &str,
494	trigger_scripts: &HashMap<String, (ScriptLanguage, String)>,
495) -> Vec<MonitorMatch> {
496	let mut filtered_matches = vec![];
497
498	for monitor_match in matches {
499		let mut is_filtered = false;
500		let trigger_conditions = match monitor_match {
501			MonitorMatch::EVM(evm_match) => &evm_match.monitor.trigger_conditions,
502			MonitorMatch::Stellar(stellar_match) => &stellar_match.monitor.trigger_conditions,
503			MonitorMatch::Midnight(midnight_match) => &midnight_match.monitor.trigger_conditions,
504		};
505
506		for trigger_condition in trigger_conditions {
507			let monitor_name = match monitor_match {
508				MonitorMatch::EVM(evm_match) => evm_match.monitor.name.clone(),
509				MonitorMatch::Stellar(stellar_match) => stellar_match.monitor.name.clone(),
510				MonitorMatch::Midnight(midnight_match) => midnight_match.monitor.name.clone(),
511			};
512
513			let script_content = trigger_scripts
514				.get(&format!(
515					"{}|{}",
516					normalize_string(&monitor_name),
517					trigger_condition.script_path
518				))
519				.ok_or_else(|| {
520					ScriptError::execution_error("Script content not found".to_string(), None, None)
521				});
522			if let Ok(script_content) = script_content {
523				if execute_trigger_condition(trigger_condition, monitor_match, script_content).await
524				{
525					is_filtered = true;
526					break;
527				}
528			}
529		}
530		if !is_filtered {
531			filtered_matches.push(monitor_match.clone());
532		}
533	}
534
535	filtered_matches
536}
537
538#[cfg(test)]
539mod tests {
540	use super::*;
541	use crate::{
542		models::{
543			EVMMonitorMatch, EVMReceiptLog, EVMTransaction, EVMTransactionReceipt, MatchConditions,
544			Monitor, MonitorMatch, ScriptLanguage, StellarBlock, StellarMonitorMatch,
545			StellarTransaction, StellarTransactionInfo, TriggerConditions,
546		},
547		utils::tests::{builders::evm::monitor::MonitorBuilder, evm::receipt::ReceiptBuilder},
548	};
549	use alloy::{
550		consensus::{transaction::Recovered, Signed, TxEnvelope},
551		primitives::{Address, Bytes, TxKind, B256, U256},
552	};
553	use std::io::Write;
554	use tempfile::NamedTempFile;
555
556	// Helper function to create a temporary script file
557	fn create_temp_script(content: &str) -> NamedTempFile {
558		let mut file = NamedTempFile::new().unwrap();
559		file.write_all(content.as_bytes()).unwrap();
560		file
561	}
562	fn create_test_monitor(
563		name: &str,
564		networks: Vec<&str>,
565		paused: bool,
566		script_path: Option<&str>,
567	) -> Monitor {
568		let mut builder = MonitorBuilder::new()
569			.name(name)
570			.networks(networks.into_iter().map(|s| s.to_string()).collect())
571			.paused(paused);
572
573		if let Some(path) = script_path {
574			builder = builder.trigger_condition(path, 1000, ScriptLanguage::Python, None);
575		}
576
577		builder.build()
578	}
579
580	fn create_test_evm_transaction_receipt() -> EVMTransactionReceipt {
581		ReceiptBuilder::new().build()
582	}
583
584	fn create_test_evm_logs() -> Vec<EVMReceiptLog> {
585		ReceiptBuilder::new().build().logs.clone()
586	}
587
588	fn create_test_evm_transaction() -> EVMTransaction {
589		let tx = alloy::consensus::TxLegacy {
590			chain_id: None,
591			nonce: 0,
592			gas_price: 0,
593			gas_limit: 0,
594			to: TxKind::Call(Address::ZERO),
595			value: U256::ZERO,
596			input: Bytes::default(),
597		};
598
599		let signature =
600			alloy::signers::Signature::from_scalars_and_parity(B256::ZERO, B256::ZERO, false);
601
602		let hash = B256::ZERO;
603
604		EVMTransaction::from(alloy::rpc::types::Transaction {
605			inner: Recovered::new_unchecked(
606				TxEnvelope::Legacy(Signed::new_unchecked(tx, signature, hash)),
607				Address::ZERO,
608			),
609			block_hash: None,
610			block_number: None,
611			transaction_index: None,
612			effective_gas_price: None,
613		})
614	}
615
616	fn create_test_stellar_transaction() -> StellarTransaction {
617		StellarTransaction::from({
618			StellarTransactionInfo {
619				..Default::default()
620			}
621		})
622	}
623
624	fn create_test_stellar_block() -> StellarBlock {
625		StellarBlock::default()
626	}
627
628	fn create_mock_monitor_match_from_path(
629		blockchain_type: BlockChainType,
630		script_path: Option<&str>,
631	) -> MonitorMatch {
632		match blockchain_type {
633			BlockChainType::EVM => MonitorMatch::EVM(Box::new(EVMMonitorMatch {
634				monitor: create_test_monitor("test", vec![], false, script_path),
635				transaction: create_test_evm_transaction(),
636				receipt: Some(create_test_evm_transaction_receipt()),
637				logs: Some(create_test_evm_logs()),
638				network_slug: "ethereum_mainnet".to_string(),
639				matched_on: MatchConditions {
640					functions: vec![],
641					events: vec![],
642					transactions: vec![],
643				},
644				matched_on_args: None,
645			})),
646			BlockChainType::Stellar => MonitorMatch::Stellar(Box::new(StellarMonitorMatch {
647				monitor: create_test_monitor("test", vec![], false, script_path),
648				transaction: create_test_stellar_transaction(),
649				ledger: create_test_stellar_block(),
650				network_slug: "stellar_mainnet".to_string(),
651				matched_on: MatchConditions {
652					functions: vec![],
653					events: vec![],
654					transactions: vec![],
655				},
656				matched_on_args: None,
657			})),
658			BlockChainType::Midnight => unimplemented!(),
659		}
660	}
661
662	fn create_mock_monitor_match_from_monitor(
663		blockchain_type: BlockChainType,
664		monitor: Monitor,
665	) -> MonitorMatch {
666		match blockchain_type {
667			BlockChainType::EVM => MonitorMatch::EVM(Box::new(EVMMonitorMatch {
668				monitor,
669				transaction: create_test_evm_transaction(),
670				receipt: Some(create_test_evm_transaction_receipt()),
671				logs: Some(create_test_evm_logs()),
672				network_slug: "ethereum_mainnet".to_string(),
673				matched_on: MatchConditions {
674					functions: vec![],
675					events: vec![],
676					transactions: vec![],
677				},
678				matched_on_args: None,
679			})),
680			BlockChainType::Stellar => MonitorMatch::Stellar(Box::new(StellarMonitorMatch {
681				monitor,
682				transaction: create_test_stellar_transaction(),
683				ledger: create_test_stellar_block(),
684				network_slug: "stellar_mainnet".to_string(),
685				matched_on: MatchConditions {
686					functions: vec![],
687					events: vec![],
688					transactions: vec![],
689				},
690				matched_on_args: None,
691			})),
692			BlockChainType::Midnight => unimplemented!(),
693		}
694	}
695
696	fn matches_equal(a: &MonitorMatch, b: &MonitorMatch) -> bool {
697		match (a, b) {
698			(MonitorMatch::EVM(a), MonitorMatch::EVM(b)) => a.monitor.name == b.monitor.name,
699			(MonitorMatch::Stellar(a), MonitorMatch::Stellar(b)) => {
700				a.monitor.name == b.monitor.name
701			}
702			_ => false,
703		}
704	}
705
706	#[test]
707	fn test_has_active_monitors() {
708		let monitors = vec![
709			create_test_monitor("1", vec!["ethereum_mainnet"], false, None),
710			create_test_monitor("2", vec!["ethereum_sepolia"], false, None),
711			create_test_monitor(
712				"3",
713				vec!["ethereum_mainnet", "ethereum_sepolia"],
714				false,
715				None,
716			),
717			create_test_monitor("4", vec!["stellar_mainnet"], true, None),
718		];
719
720		assert!(has_active_monitors(
721			&monitors,
722			&"ethereum_mainnet".to_string()
723		));
724		assert!(has_active_monitors(
725			&monitors,
726			&"ethereum_sepolia".to_string()
727		));
728		assert!(!has_active_monitors(
729			&monitors,
730			&"midnight_mainnet".to_string()
731		));
732		assert!(!has_active_monitors(
733			&monitors,
734			&"stellar_mainnet".to_string()
735		));
736	}
737
738	#[test]
739	fn test_filter_active_monitors() {
740		let mut monitors = HashMap::new();
741		monitors.insert(
742			"1".to_string(),
743			create_test_monitor("1", vec!["ethereum_mainnet"], false, None),
744		);
745		monitors.insert(
746			"2".to_string(),
747			create_test_monitor("2", vec!["stellar_mainnet"], true, None),
748		);
749		monitors.insert(
750			"3".to_string(),
751			create_test_monitor("3", vec!["ethereum_mainnet"], false, None),
752		);
753
754		let active_monitors = filter_active_monitors(monitors);
755		assert_eq!(active_monitors.len(), 2);
756		assert!(active_monitors.iter().all(|m| !m.paused));
757	}
758
759	#[test]
760	fn test_filter_network_monitors() {
761		let monitors = vec![
762			create_test_monitor("1", vec!["ethereum_mainnet"], false, None),
763			create_test_monitor("2", vec!["stellar_mainnet"], true, None),
764			create_test_monitor(
765				"3",
766				vec!["ethereum_mainnet", "stellar_mainnet"],
767				false,
768				None,
769			),
770		];
771
772		let eth_monitors = filter_network_monitors(&monitors, &"ethereum_mainnet".to_string());
773		assert_eq!(eth_monitors.len(), 2);
774		assert!(eth_monitors
775			.iter()
776			.all(|m| m.networks.contains(&"ethereum_mainnet".to_string())));
777
778		let stellar_monitors = filter_network_monitors(&monitors, &"stellar_mainnet".to_string());
779		assert_eq!(stellar_monitors.len(), 2);
780		assert!(stellar_monitors
781			.iter()
782			.all(|m| m.networks.contains(&"stellar_mainnet".to_string())));
783
784		let midnight_monitors = filter_network_monitors(&monitors, &"midnight_mainnet".to_string());
785		assert!(midnight_monitors.is_empty());
786	}
787
788	#[tokio::test]
789	async fn test_run_trigger_filters_empty_matches() {
790		// Create empty matches vector
791		let matches: Vec<MonitorMatch> = vec![];
792
793		// Create trigger scripts with a more realistic script path
794		let mut trigger_scripts = HashMap::new();
795		trigger_scripts.insert(
796			"monitor_test-test.py".to_string(), // Using a more realistic key format
797			(
798				ScriptLanguage::Python,
799				r#"
800import sys
801import json
802
803input_data = sys.stdin.read()
804data = json.loads(input_data)
805print(False)
806            "#
807				.to_string(),
808			),
809		);
810
811		// Test the filter function
812		let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
813		assert!(filtered.is_empty());
814	}
815
816	#[tokio::test]
817	async fn test_run_trigger_filters_true_condition() {
818		let script_content = r#"
819import sys
820import json
821
822input_json = sys.argv[1]
823data = json.loads(input_json)
824print("debugging...")
825def test():
826	return True
827result = test()
828print(result)
829"#;
830		let temp_file = create_temp_script(script_content);
831		let mut trigger_scripts = HashMap::new();
832		trigger_scripts.insert(
833			format!("test-{}", temp_file.path().to_str().unwrap()),
834			(ScriptLanguage::Python, script_content.to_string()),
835		);
836		let match_item = create_mock_monitor_match_from_path(
837			BlockChainType::EVM,
838			Some(temp_file.path().to_str().unwrap()),
839		);
840		let matches = vec![match_item.clone()];
841
842		let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
843		assert_eq!(filtered.len(), 1);
844		assert!(matches_equal(&filtered[0], &match_item));
845	}
846
847	#[tokio::test]
848	async fn test_run_trigger_filters_false_condition() {
849		let script_content = r#"
850import sys
851import json
852
853input_data = sys.stdin.read()
854data = json.loads(input_data)
855print("debugging...")
856def test():
857	return False
858result = test()
859print(result)
860"#;
861		let temp_file = create_temp_script(script_content);
862		let mut trigger_scripts = HashMap::new();
863		trigger_scripts.insert(
864			format!("test-{}", temp_file.path().to_str().unwrap()),
865			(ScriptLanguage::Python, script_content.to_string()),
866		);
867		let match_item = create_mock_monitor_match_from_path(
868			BlockChainType::EVM,
869			Some(temp_file.path().to_str().unwrap()),
870		);
871		let matches = vec![match_item.clone()];
872
873		let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
874		assert_eq!(filtered.len(), 1);
875	}
876
877	#[tokio::test]
878	async fn test_execute_trigger_condition_returns_false() {
879		let script_content = r#"print(False)  # Script returns false"#;
880		let temp_file = create_temp_script(script_content);
881		let trigger_condition = TriggerConditions {
882			language: ScriptLanguage::Python,
883			script_path: temp_file.path().to_str().unwrap().to_string(),
884			timeout_ms: 1000,
885			arguments: None,
886		};
887		let match_item = create_mock_monitor_match_from_path(
888			BlockChainType::EVM,
889			Some(temp_file.path().to_str().unwrap()),
890		);
891		let script_content = (ScriptLanguage::Python, script_content.to_string());
892
893		let result =
894			execute_trigger_condition(&trigger_condition, &match_item, &script_content).await;
895		assert!(!result); // Should be false when script returns false
896	}
897
898	#[tokio::test]
899	async fn test_execute_trigger_condition_script_error() {
900		let script_content = r#"raise Exception("Test error")  # Raise an error"#;
901		let temp_file = create_temp_script(script_content);
902		let trigger_condition = TriggerConditions {
903			language: ScriptLanguage::Python,
904			script_path: temp_file.path().to_str().unwrap().to_string(),
905			timeout_ms: 1000,
906			arguments: None,
907		};
908		let match_item = create_mock_monitor_match_from_path(
909			BlockChainType::EVM,
910			Some(temp_file.path().to_str().unwrap()),
911		);
912		let script_content = (ScriptLanguage::Python, script_content.to_string());
913
914		let result =
915			execute_trigger_condition(&trigger_condition, &match_item, &script_content).await;
916		assert!(!result); // Should be false when script errors
917	}
918
919	#[tokio::test]
920	async fn test_execute_trigger_condition_invalid_script() {
921		let trigger_condition = TriggerConditions {
922			language: ScriptLanguage::Python,
923			script_path: "non_existent_script.py".to_string(),
924			timeout_ms: 1000,
925			arguments: None,
926		};
927		let match_item = create_mock_monitor_match_from_path(
928			BlockChainType::EVM,
929			Some("non_existent_script.py"),
930		);
931		let script_content = (ScriptLanguage::Python, "invalid script content".to_string());
932
933		let result =
934			execute_trigger_condition(&trigger_condition, &match_item, &script_content).await;
935		assert!(!result); // Should be false for invalid script
936	}
937
938	#[tokio::test]
939	async fn test_run_trigger_filters_multiple_conditions_keep_match() {
940		// Create a monitor with two trigger conditions
941		let monitor = MonitorBuilder::new()
942			.name("monitor_test")
943			.networks(vec!["ethereum_mainnet".to_string()])
944			.trigger_condition("test1.py", 1000, ScriptLanguage::Python, None)
945			.trigger_condition("test2.py", 1000, ScriptLanguage::Python, None)
946			.build();
947
948		// Create a match with this monitor
949		let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
950
951		let mut trigger_scripts = HashMap::new();
952		trigger_scripts.insert(
953			"monitor_test|test1.py".to_string(),
954			(
955				ScriptLanguage::Python,
956				r#"
957import sys
958import json
959
960input_data = sys.stdin.read()
961data = json.loads(input_data)
962print(True)
963"#
964				.to_string(),
965			),
966		);
967		trigger_scripts.insert(
968			"monitor_test|test2.py".to_string(),
969			(
970				ScriptLanguage::Python,
971				r#"
972import sys
973import json
974input_data = sys.stdin.read()
975data = json.loads(input_data)
976print(True)
977"#
978				.to_string(),
979			),
980		);
981
982		// Run the filter with our test data
983		let matches = vec![match_item.clone()];
984		let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
985
986		assert_eq!(filtered.len(), 0);
987	}
988
989	#[tokio::test]
990	async fn test_run_trigger_filters_condition_two_combinations_exclude_match() {
991		let monitor = MonitorBuilder::new()
992			.name("monitor_test")
993			.networks(vec!["ethereum_mainnet".to_string()])
994			.trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
995			.trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
996			.build();
997
998		let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
999
1000		// Test case 1: All conditions return true - match should be kept
1001		let mut trigger_scripts = HashMap::new();
1002		trigger_scripts.insert(
1003			"monitor_test|condition1.py".to_string(),
1004			(ScriptLanguage::Python, "print(True)".to_string()),
1005		);
1006		trigger_scripts.insert(
1007			"monitor_test|condition2.py".to_string(),
1008			(ScriptLanguage::Python, "print(False)".to_string()),
1009		);
1010
1011		let matches = vec![match_item.clone()];
1012		let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1013		assert_eq!(filtered.len(), 0);
1014	}
1015
1016	#[tokio::test]
1017	async fn test_run_trigger_filters_condition_two_combinations_keep_match() {
1018		let monitor = MonitorBuilder::new()
1019			.name("monitor_test")
1020			.networks(vec!["ethereum_mainnet".to_string()])
1021			.trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1022			.trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1023			.build();
1024
1025		let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1026
1027		let mut trigger_scripts = HashMap::new();
1028		trigger_scripts.insert(
1029			"monitor_test|condition1.py".to_string(),
1030			(ScriptLanguage::Python, "print(False)".to_string()),
1031		);
1032		trigger_scripts.insert(
1033			"monitor_test|condition2.py".to_string(),
1034			(ScriptLanguage::Python, "print(False)".to_string()),
1035		);
1036
1037		let matches = vec![match_item.clone()];
1038		let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1039		assert_eq!(filtered.len(), 1);
1040	}
1041
1042	#[tokio::test]
1043	async fn test_run_trigger_filters_condition_two_combinations_exclude_match_last_condition() {
1044		let monitor = MonitorBuilder::new()
1045			.name("monitor_test")
1046			.networks(vec!["ethereum_mainnet".to_string()])
1047			.trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1048			.trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1049			.build();
1050
1051		let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1052
1053		let mut trigger_scripts = HashMap::new();
1054		trigger_scripts.insert(
1055			"monitor_test|condition1.py".to_string(),
1056			(ScriptLanguage::Python, "print(False)".to_string()),
1057		);
1058		trigger_scripts.insert(
1059			"monitor_test|condition2.py".to_string(),
1060			(ScriptLanguage::Python, "print(True)".to_string()),
1061		);
1062
1063		let matches = vec![match_item.clone()];
1064		let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1065		assert_eq!(filtered.len(), 0);
1066	}
1067
1068	#[tokio::test]
1069	async fn test_run_trigger_filters_condition_three_combinations_exclude_match() {
1070		let monitor = MonitorBuilder::new()
1071			.name("monitor_test")
1072			.networks(vec!["ethereum_mainnet".to_string()])
1073			.trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1074			.trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1075			.trigger_condition("condition3.py", 1000, ScriptLanguage::Python, None)
1076			.build();
1077
1078		let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1079
1080		let mut trigger_scripts = HashMap::new();
1081		trigger_scripts.insert(
1082			"monitor_test|condition1.py".to_string(),
1083			(ScriptLanguage::Python, "print(False)".to_string()),
1084		);
1085		trigger_scripts.insert(
1086			"monitor_test|condition2.py".to_string(),
1087			(ScriptLanguage::Python, "print(False)".to_string()),
1088		);
1089		trigger_scripts.insert(
1090			"monitor_test|condition3.py".to_string(),
1091			(ScriptLanguage::Python, "print(True)".to_string()),
1092		);
1093
1094		let matches = vec![match_item.clone()];
1095		let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1096		assert_eq!(filtered.len(), 0);
1097	}
1098
1099	#[tokio::test]
1100	async fn test_run_trigger_filters_condition_three_combinations_keep_match() {
1101		let monitor = MonitorBuilder::new()
1102			.name("monitor_test")
1103			.networks(vec!["ethereum_mainnet".to_string()])
1104			.trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1105			.trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1106			.trigger_condition("condition3.py", 1000, ScriptLanguage::Python, None)
1107			.build();
1108
1109		let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1110
1111		let mut trigger_scripts = HashMap::new();
1112		trigger_scripts.insert(
1113			"monitor_test|condition1.py".to_string(),
1114			(ScriptLanguage::Python, "print(False)".to_string()),
1115		);
1116		trigger_scripts.insert(
1117			"monitor_test|condition2.py".to_string(),
1118			(ScriptLanguage::Python, "print(False)".to_string()),
1119		);
1120		trigger_scripts.insert(
1121			"monitor_test|condition3.py".to_string(),
1122			(ScriptLanguage::Python, "print(False)".to_string()),
1123		);
1124
1125		let matches = vec![match_item.clone()];
1126		let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1127		assert_eq!(filtered.len(), 1);
1128	}
1129
1130	// Add these new test cases
1131	#[tokio::test]
1132	async fn test_run_trigger_filters_stellar_empty_matches() {
1133		let matches: Vec<MonitorMatch> = vec![];
1134		let mut trigger_scripts = HashMap::new();
1135		trigger_scripts.insert(
1136			"monitor_test|test.py".to_string(),
1137			(
1138				ScriptLanguage::Python,
1139				r#"
1140import sys
1141import json
1142
1143input_data = sys.stdin.read()
1144data = json.loads(input_data)
1145print(False)
1146"#
1147				.to_string(),
1148			),
1149		);
1150
1151		let filtered = run_trigger_filters(&matches, "stellar_mainnet", &trigger_scripts).await;
1152		assert!(filtered.is_empty());
1153	}
1154
1155	#[tokio::test]
1156	async fn test_run_trigger_filters_stellar_true_condition() {
1157		let script_content = r#"
1158import sys
1159import json
1160
1161input_json = sys.argv[1]
1162data = json.loads(input_json)
1163print("debugging...")
1164def test():
1165	return True
1166result = test()
1167print(result)
1168"#;
1169		let temp_file = create_temp_script(script_content);
1170		let mut trigger_scripts = HashMap::new();
1171		trigger_scripts.insert(
1172			format!("test|{}", temp_file.path().to_str().unwrap()),
1173			(ScriptLanguage::Python, script_content.to_string()),
1174		);
1175		let match_item = create_mock_monitor_match_from_path(
1176			BlockChainType::Stellar,
1177			Some(temp_file.path().to_str().unwrap()),
1178		);
1179		let matches = vec![match_item.clone()];
1180
1181		let filtered = run_trigger_filters(&matches, "stellar_mainnet", &trigger_scripts).await;
1182		assert_eq!(filtered.len(), 1);
1183		assert!(matches_equal(&filtered[0], &match_item));
1184	}
1185
1186	#[tokio::test]
1187	async fn test_run_trigger_filters_stellar_multiple_conditions() {
1188		let monitor = MonitorBuilder::new()
1189			.name("monitor_test")
1190			.networks(vec!["stellar_mainnet".to_string()])
1191			.trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1192			.trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1193			.build();
1194
1195		let match_item = create_mock_monitor_match_from_monitor(BlockChainType::Stellar, monitor);
1196
1197		let mut trigger_scripts = HashMap::new();
1198		trigger_scripts.insert(
1199			"monitor_test|condition1.py".to_string(),
1200			(ScriptLanguage::Python, "print(False)".to_string()),
1201		);
1202		trigger_scripts.insert(
1203			"monitor_test|condition2.py".to_string(),
1204			(ScriptLanguage::Python, "print(True)".to_string()),
1205		);
1206
1207		let matches = vec![match_item.clone()];
1208		let filtered = run_trigger_filters(&matches, "stellar_mainnet", &trigger_scripts).await;
1209		assert_eq!(filtered.len(), 0); // Match should be filtered out because condition2 returns true
1210	}
1211}