1use futures::future::BoxFuture;
19use std::{collections::HashMap, error::Error, sync::Arc};
20use tokio::sync::{watch, Mutex};
21
22use crate::{
23 models::{
24 BlockChainType, BlockType, ContractSpec, Monitor, MonitorMatch, Network, ProcessedBlock,
25 ScriptLanguage, TriggerConditions,
26 },
27 repositories::{
28 MonitorRepositoryTrait, MonitorService, NetworkRepositoryTrait, NetworkService,
29 TriggerRepositoryTrait, TriggerService,
30 },
31 services::{
32 blockchain::{BlockChainClient, BlockFilterFactory, ClientPoolTrait},
33 filter::{evm_helpers, handle_match, stellar_helpers, FilterService},
34 notification::NotificationService,
35 trigger::{
36 ScriptError, ScriptExecutorFactory, TriggerError, TriggerExecutionService,
37 TriggerExecutionServiceTrait,
38 },
39 },
40 utils::normalize_string,
41};
42
43pub type Result<T> = std::result::Result<T, Box<dyn Error>>;
45
46type ServiceResult<M, N, T> = Result<(
47 Arc<FilterService>,
48 Arc<TriggerExecutionService<T>>,
49 Vec<Monitor>,
50 HashMap<String, Network>,
51 Arc<Mutex<MonitorService<M, N, T>>>,
52 Arc<Mutex<NetworkService<N>>>,
53 Arc<Mutex<TriggerService<T>>>,
54)>;
55
56pub async fn initialize_services<M, N, T>(
70 monitor_service: Option<MonitorService<M, N, T>>,
71 network_service: Option<NetworkService<N>>,
72 trigger_service: Option<TriggerService<T>>,
73) -> ServiceResult<M, N, T>
74where
75 M: MonitorRepositoryTrait<N, T> + Send + Sync + 'static,
76 N: NetworkRepositoryTrait + Send + Sync + 'static,
77 T: TriggerRepositoryTrait + Send + Sync + 'static,
78{
79 let network_service = match network_service {
80 Some(service) => service,
81 None => {
82 let repository = N::new(None).await?;
83 NetworkService::<N>::new_with_repository(repository)?
84 }
85 };
86
87 let trigger_service = match trigger_service {
88 Some(service) => service,
89 None => {
90 let repository = T::new(None).await?;
91 TriggerService::<T>::new_with_repository(repository)?
92 }
93 };
94
95 let monitor_service = match monitor_service {
96 Some(service) => service,
97 None => {
98 let repository = M::new(
99 None,
100 Some(network_service.clone()),
101 Some(trigger_service.clone()),
102 )
103 .await?;
104 MonitorService::<M, N, T>::new_with_repository(repository)?
105 }
106 };
107
108 let notification_service = NotificationService::new();
109
110 let filter_service = Arc::new(FilterService::new());
111 let trigger_execution_service = Arc::new(TriggerExecutionService::new(
112 trigger_service.clone(),
113 notification_service,
114 ));
115
116 let monitors = monitor_service.get_all();
117 let active_monitors = filter_active_monitors(monitors);
118 let networks = network_service.get_all();
119
120 Ok((
121 filter_service,
122 trigger_execution_service,
123 active_monitors,
124 networks,
125 Arc::new(Mutex::new(monitor_service)),
126 Arc::new(Mutex::new(network_service)),
127 Arc::new(Mutex::new(trigger_service)),
128 ))
129}
130
131pub fn create_block_handler<P: ClientPoolTrait + 'static>(
142 shutdown_tx: watch::Sender<bool>,
143 filter_service: Arc<FilterService>,
144 active_monitors: Vec<Monitor>,
145 client_pools: Arc<P>,
146 contract_specs: Vec<(String, ContractSpec)>,
147) -> Arc<impl Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync> {
148 Arc::new(
149 move |block: BlockType, network: Network| -> BoxFuture<'static, ProcessedBlock> {
150 let filter_service = filter_service.clone();
151 let active_monitors = active_monitors.clone();
152 let client_pools = client_pools.clone();
153 let shutdown_tx = shutdown_tx.clone();
154 let contract_specs = contract_specs.clone();
155 Box::pin(async move {
156 let applicable_monitors = filter_network_monitors(&active_monitors, &network.slug);
157
158 let mut processed_block = ProcessedBlock {
159 block_number: block.number().unwrap_or(0),
160 network_slug: network.slug.clone(),
161 processing_results: Vec::new(),
162 };
163
164 if !applicable_monitors.is_empty() {
165 let mut shutdown_rx = shutdown_tx.subscribe();
166
167 let matches = match network.network_type {
168 BlockChainType::EVM => match client_pools.get_evm_client(&network).await {
169 Ok(client) => {
170 process_block(
171 client.as_ref(),
172 &network,
173 &block,
174 &applicable_monitors,
175 Some(&contract_specs),
176 &filter_service,
177 &mut shutdown_rx,
178 )
179 .await
180 }
181 Err(_) => None,
182 },
183 BlockChainType::Stellar => {
184 match client_pools.get_stellar_client(&network).await {
185 Ok(client) => {
186 process_block(
187 client.as_ref(),
188 &network,
189 &block,
190 &applicable_monitors,
191 Some(&contract_specs),
192 &filter_service,
193 &mut shutdown_rx,
194 )
195 .await
196 }
197 Err(_) => None,
198 }
199 }
200 BlockChainType::Midnight => {
201 match client_pools.get_midnight_client(&network).await {
202 Ok(client) => {
203 process_block(
204 client.as_ref(),
205 &network,
206 &block,
207 &applicable_monitors,
208 Some(&contract_specs),
209 &filter_service,
210 &mut shutdown_rx,
211 )
212 .await
213 }
214 Err(_) => None,
215 }
216 }
217 };
218
219 processed_block.processing_results = matches.unwrap_or_default();
220 }
221
222 processed_block
223 })
224 },
225 )
226}
227
228pub async fn process_block<T>(
238 client: &T,
239 network: &Network,
240 block: &BlockType,
241 applicable_monitors: &[Monitor],
242 contract_specs: Option<&[(String, ContractSpec)]>,
243 filter_service: &FilterService,
244 shutdown_rx: &mut watch::Receiver<bool>,
245) -> Option<Vec<MonitorMatch>>
246where
247 T: BlockChainClient + BlockFilterFactory<T>,
248{
249 tokio::select! {
250 result = filter_service.filter_block(client, network, block, applicable_monitors, contract_specs) => {
251 result.ok()
252 }
253 _ = shutdown_rx.changed() => {
254 tracing::info!("Shutting down block processing task");
255 None
256 }
257 }
258}
259
260pub async fn get_contract_specs<P: ClientPoolTrait + 'static>(
269 client_pool: &Arc<P>,
270 network_monitors: &[(Network, Vec<Monitor>)],
271) -> Vec<(String, ContractSpec)> {
272 let mut all_specs = Vec::new();
273
274 for (network, monitors) in network_monitors {
275 for monitor in monitors {
276 let specs = match network.network_type {
277 BlockChainType::Stellar => {
278 let mut contract_specs = Vec::new();
279 let mut addresses_without_specs = Vec::new();
280 for monitored_addr in &monitor.addresses {
282 if let Some(spec) = &monitored_addr.contract_spec {
283 let parsed_spec = match spec {
284 ContractSpec::Stellar(spec) => spec,
285 _ => {
286 tracing::warn!(
287 "Skipping non-Stellar contract spec for address {}",
288 monitored_addr.address
289 );
290 continue;
291 }
292 };
293
294 contract_specs.push((
295 stellar_helpers::normalize_address(&monitored_addr.address),
296 ContractSpec::Stellar(parsed_spec.clone()),
297 ))
298 } else {
299 addresses_without_specs.push(monitored_addr.address.clone());
300 }
301 }
302
303 if !addresses_without_specs.is_empty() {
305 let client: Arc<P::StellarClient> =
307 match client_pool.get_stellar_client(network).await {
308 Ok(client) => client,
309 Err(_) => {
310 tracing::warn!("Failed to get stellar client");
311 continue;
312 }
313 };
314
315 let chain_specs = futures::future::join_all(
316 addresses_without_specs.iter().map(|address| {
317 let client = client.clone();
318 async move {
319 let spec = client.get_contract_spec(address).await;
320 (address.clone(), spec)
321 }
322 }),
323 )
324 .await
325 .into_iter()
326 .filter_map(|(addr, spec)| match spec {
327 Ok(s) => Some((addr, s)),
328 Err(e) => {
329 tracing::warn!(
330 "Failed to fetch contract spec for address {}: {:?}",
331 addr,
332 e
333 );
334 None
335 }
336 })
337 .collect::<Vec<_>>();
338
339 contract_specs.extend(chain_specs);
340 }
341 contract_specs
342 }
343 BlockChainType::EVM => {
344 let mut contract_specs = Vec::new();
345 for monitored_addr in &monitor.addresses {
347 if let Some(spec) = &monitored_addr.contract_spec {
348 let parsed_spec = match spec {
349 ContractSpec::EVM(spec) => spec,
350 _ => {
351 tracing::warn!(
352 "Skipping non-EVM contract spec for address {}",
353 monitored_addr.address
354 );
355 continue;
356 }
357 };
358
359 contract_specs.push((
360 format!(
361 "0x{}",
362 evm_helpers::normalize_address(&monitored_addr.address)
363 ),
364 ContractSpec::EVM(parsed_spec.clone()),
365 ))
366 }
367 }
368 contract_specs
369 }
370 _ => {
371 vec![]
372 }
373 };
374 all_specs.extend(specs);
375 }
376 }
377 all_specs
378}
379
380pub fn create_trigger_handler<S: TriggerExecutionServiceTrait + Send + Sync + 'static>(
390 shutdown_tx: watch::Sender<bool>,
391 trigger_service: Arc<S>,
392 active_monitors_trigger_scripts: HashMap<String, (ScriptLanguage, String)>,
393) -> Arc<impl Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync> {
394 Arc::new(move |block: &ProcessedBlock| {
395 let mut shutdown_rx = shutdown_tx.subscribe();
396 let trigger_service = trigger_service.clone();
397 let trigger_scripts = active_monitors_trigger_scripts.clone();
398 let block = block.clone();
399
400 tokio::spawn(async move {
401 tokio::select! {
402 _ = async {
403 if block.processing_results.is_empty() {
404 return;
405 }
406 let filtered_matches = run_trigger_filters(&block.processing_results, &block.network_slug, &trigger_scripts).await;
407 for monitor_match in &filtered_matches {
408 if let Err(e) = handle_match(monitor_match.clone(), &*trigger_service, &trigger_scripts).await {
409 TriggerError::execution_error(e.to_string(), Some(e.into()), None);
410 }
411 }
412 } => {}
413 _ = shutdown_rx.changed() => {
414 tracing::info!("Shutting down trigger handling task");
415 }
416 }
417 })
418 })
419}
420
421pub fn has_active_monitors(monitors: &[Monitor], network_slug: &String) -> bool {
430 monitors
431 .iter()
432 .any(|m| m.networks.contains(network_slug) && !m.paused)
433}
434
435fn filter_active_monitors(monitors: HashMap<String, Monitor>) -> Vec<Monitor> {
443 monitors
444 .into_values()
445 .filter(|m| !m.paused)
446 .collect::<Vec<_>>()
447}
448
449fn filter_network_monitors(monitors: &[Monitor], network_slug: &String) -> Vec<Monitor> {
458 monitors
459 .iter()
460 .filter(|m| m.networks.contains(network_slug))
461 .cloned()
462 .collect()
463}
464
465async fn execute_trigger_condition(
466 trigger_condition: &TriggerConditions,
467 monitor_match: &MonitorMatch,
468 script_content: &(ScriptLanguage, String),
469) -> bool {
470 let executor = ScriptExecutorFactory::create(&script_content.0, &script_content.1);
471
472 let result = executor
473 .execute(
474 monitor_match.clone(),
475 &trigger_condition.timeout_ms,
476 trigger_condition.arguments.as_deref(),
477 false,
478 )
479 .await;
480
481 match result {
482 Ok(true) => true,
483 Err(e) => {
484 ScriptError::execution_error(e.to_string(), None, None);
485 false
486 }
487 _ => false,
488 }
489}
490
491async fn run_trigger_filters(
492 matches: &[MonitorMatch],
493 _network: &str,
494 trigger_scripts: &HashMap<String, (ScriptLanguage, String)>,
495) -> Vec<MonitorMatch> {
496 let mut filtered_matches = vec![];
497
498 for monitor_match in matches {
499 let mut is_filtered = false;
500 let trigger_conditions = match monitor_match {
501 MonitorMatch::EVM(evm_match) => &evm_match.monitor.trigger_conditions,
502 MonitorMatch::Stellar(stellar_match) => &stellar_match.monitor.trigger_conditions,
503 MonitorMatch::Midnight(midnight_match) => &midnight_match.monitor.trigger_conditions,
504 };
505
506 for trigger_condition in trigger_conditions {
507 let monitor_name = match monitor_match {
508 MonitorMatch::EVM(evm_match) => evm_match.monitor.name.clone(),
509 MonitorMatch::Stellar(stellar_match) => stellar_match.monitor.name.clone(),
510 MonitorMatch::Midnight(midnight_match) => midnight_match.monitor.name.clone(),
511 };
512
513 let script_content = trigger_scripts
514 .get(&format!(
515 "{}|{}",
516 normalize_string(&monitor_name),
517 trigger_condition.script_path
518 ))
519 .ok_or_else(|| {
520 ScriptError::execution_error("Script content not found".to_string(), None, None)
521 });
522 if let Ok(script_content) = script_content {
523 if execute_trigger_condition(trigger_condition, monitor_match, script_content).await
524 {
525 is_filtered = true;
526 break;
527 }
528 }
529 }
530 if !is_filtered {
531 filtered_matches.push(monitor_match.clone());
532 }
533 }
534
535 filtered_matches
536}
537
538#[cfg(test)]
539mod tests {
540 use super::*;
541 use crate::{
542 models::{
543 EVMMonitorMatch, EVMReceiptLog, EVMTransaction, EVMTransactionReceipt, MatchConditions,
544 Monitor, MonitorMatch, ScriptLanguage, StellarBlock, StellarMonitorMatch,
545 StellarTransaction, StellarTransactionInfo, TriggerConditions,
546 },
547 utils::tests::{builders::evm::monitor::MonitorBuilder, evm::receipt::ReceiptBuilder},
548 };
549 use alloy::{
550 consensus::{transaction::Recovered, Signed, TxEnvelope},
551 primitives::{Address, Bytes, TxKind, B256, U256},
552 };
553 use std::io::Write;
554 use tempfile::NamedTempFile;
555
556 fn create_temp_script(content: &str) -> NamedTempFile {
558 let mut file = NamedTempFile::new().unwrap();
559 file.write_all(content.as_bytes()).unwrap();
560 file
561 }
562 fn create_test_monitor(
563 name: &str,
564 networks: Vec<&str>,
565 paused: bool,
566 script_path: Option<&str>,
567 ) -> Monitor {
568 let mut builder = MonitorBuilder::new()
569 .name(name)
570 .networks(networks.into_iter().map(|s| s.to_string()).collect())
571 .paused(paused);
572
573 if let Some(path) = script_path {
574 builder = builder.trigger_condition(path, 1000, ScriptLanguage::Python, None);
575 }
576
577 builder.build()
578 }
579
580 fn create_test_evm_transaction_receipt() -> EVMTransactionReceipt {
581 ReceiptBuilder::new().build()
582 }
583
584 fn create_test_evm_logs() -> Vec<EVMReceiptLog> {
585 ReceiptBuilder::new().build().logs.clone()
586 }
587
588 fn create_test_evm_transaction() -> EVMTransaction {
589 let tx = alloy::consensus::TxLegacy {
590 chain_id: None,
591 nonce: 0,
592 gas_price: 0,
593 gas_limit: 0,
594 to: TxKind::Call(Address::ZERO),
595 value: U256::ZERO,
596 input: Bytes::default(),
597 };
598
599 let signature =
600 alloy::signers::Signature::from_scalars_and_parity(B256::ZERO, B256::ZERO, false);
601
602 let hash = B256::ZERO;
603
604 EVMTransaction::from(alloy::rpc::types::Transaction {
605 inner: Recovered::new_unchecked(
606 TxEnvelope::Legacy(Signed::new_unchecked(tx, signature, hash)),
607 Address::ZERO,
608 ),
609 block_hash: None,
610 block_number: None,
611 transaction_index: None,
612 effective_gas_price: None,
613 })
614 }
615
616 fn create_test_stellar_transaction() -> StellarTransaction {
617 StellarTransaction::from({
618 StellarTransactionInfo {
619 ..Default::default()
620 }
621 })
622 }
623
624 fn create_test_stellar_block() -> StellarBlock {
625 StellarBlock::default()
626 }
627
628 fn create_mock_monitor_match_from_path(
629 blockchain_type: BlockChainType,
630 script_path: Option<&str>,
631 ) -> MonitorMatch {
632 match blockchain_type {
633 BlockChainType::EVM => MonitorMatch::EVM(Box::new(EVMMonitorMatch {
634 monitor: create_test_monitor("test", vec![], false, script_path),
635 transaction: create_test_evm_transaction(),
636 receipt: Some(create_test_evm_transaction_receipt()),
637 logs: Some(create_test_evm_logs()),
638 network_slug: "ethereum_mainnet".to_string(),
639 matched_on: MatchConditions {
640 functions: vec![],
641 events: vec![],
642 transactions: vec![],
643 },
644 matched_on_args: None,
645 })),
646 BlockChainType::Stellar => MonitorMatch::Stellar(Box::new(StellarMonitorMatch {
647 monitor: create_test_monitor("test", vec![], false, script_path),
648 transaction: create_test_stellar_transaction(),
649 ledger: create_test_stellar_block(),
650 network_slug: "stellar_mainnet".to_string(),
651 matched_on: MatchConditions {
652 functions: vec![],
653 events: vec![],
654 transactions: vec![],
655 },
656 matched_on_args: None,
657 })),
658 BlockChainType::Midnight => unimplemented!(),
659 }
660 }
661
662 fn create_mock_monitor_match_from_monitor(
663 blockchain_type: BlockChainType,
664 monitor: Monitor,
665 ) -> MonitorMatch {
666 match blockchain_type {
667 BlockChainType::EVM => MonitorMatch::EVM(Box::new(EVMMonitorMatch {
668 monitor,
669 transaction: create_test_evm_transaction(),
670 receipt: Some(create_test_evm_transaction_receipt()),
671 logs: Some(create_test_evm_logs()),
672 network_slug: "ethereum_mainnet".to_string(),
673 matched_on: MatchConditions {
674 functions: vec![],
675 events: vec![],
676 transactions: vec![],
677 },
678 matched_on_args: None,
679 })),
680 BlockChainType::Stellar => MonitorMatch::Stellar(Box::new(StellarMonitorMatch {
681 monitor,
682 transaction: create_test_stellar_transaction(),
683 ledger: create_test_stellar_block(),
684 network_slug: "stellar_mainnet".to_string(),
685 matched_on: MatchConditions {
686 functions: vec![],
687 events: vec![],
688 transactions: vec![],
689 },
690 matched_on_args: None,
691 })),
692 BlockChainType::Midnight => unimplemented!(),
693 }
694 }
695
696 fn matches_equal(a: &MonitorMatch, b: &MonitorMatch) -> bool {
697 match (a, b) {
698 (MonitorMatch::EVM(a), MonitorMatch::EVM(b)) => a.monitor.name == b.monitor.name,
699 (MonitorMatch::Stellar(a), MonitorMatch::Stellar(b)) => {
700 a.monitor.name == b.monitor.name
701 }
702 _ => false,
703 }
704 }
705
706 #[test]
707 fn test_has_active_monitors() {
708 let monitors = vec![
709 create_test_monitor("1", vec!["ethereum_mainnet"], false, None),
710 create_test_monitor("2", vec!["ethereum_sepolia"], false, None),
711 create_test_monitor(
712 "3",
713 vec!["ethereum_mainnet", "ethereum_sepolia"],
714 false,
715 None,
716 ),
717 create_test_monitor("4", vec!["stellar_mainnet"], true, None),
718 ];
719
720 assert!(has_active_monitors(
721 &monitors,
722 &"ethereum_mainnet".to_string()
723 ));
724 assert!(has_active_monitors(
725 &monitors,
726 &"ethereum_sepolia".to_string()
727 ));
728 assert!(!has_active_monitors(
729 &monitors,
730 &"midnight_mainnet".to_string()
731 ));
732 assert!(!has_active_monitors(
733 &monitors,
734 &"stellar_mainnet".to_string()
735 ));
736 }
737
738 #[test]
739 fn test_filter_active_monitors() {
740 let mut monitors = HashMap::new();
741 monitors.insert(
742 "1".to_string(),
743 create_test_monitor("1", vec!["ethereum_mainnet"], false, None),
744 );
745 monitors.insert(
746 "2".to_string(),
747 create_test_monitor("2", vec!["stellar_mainnet"], true, None),
748 );
749 monitors.insert(
750 "3".to_string(),
751 create_test_monitor("3", vec!["ethereum_mainnet"], false, None),
752 );
753
754 let active_monitors = filter_active_monitors(monitors);
755 assert_eq!(active_monitors.len(), 2);
756 assert!(active_monitors.iter().all(|m| !m.paused));
757 }
758
759 #[test]
760 fn test_filter_network_monitors() {
761 let monitors = vec![
762 create_test_monitor("1", vec!["ethereum_mainnet"], false, None),
763 create_test_monitor("2", vec!["stellar_mainnet"], true, None),
764 create_test_monitor(
765 "3",
766 vec!["ethereum_mainnet", "stellar_mainnet"],
767 false,
768 None,
769 ),
770 ];
771
772 let eth_monitors = filter_network_monitors(&monitors, &"ethereum_mainnet".to_string());
773 assert_eq!(eth_monitors.len(), 2);
774 assert!(eth_monitors
775 .iter()
776 .all(|m| m.networks.contains(&"ethereum_mainnet".to_string())));
777
778 let stellar_monitors = filter_network_monitors(&monitors, &"stellar_mainnet".to_string());
779 assert_eq!(stellar_monitors.len(), 2);
780 assert!(stellar_monitors
781 .iter()
782 .all(|m| m.networks.contains(&"stellar_mainnet".to_string())));
783
784 let midnight_monitors = filter_network_monitors(&monitors, &"midnight_mainnet".to_string());
785 assert!(midnight_monitors.is_empty());
786 }
787
788 #[tokio::test]
789 async fn test_run_trigger_filters_empty_matches() {
790 let matches: Vec<MonitorMatch> = vec![];
792
793 let mut trigger_scripts = HashMap::new();
795 trigger_scripts.insert(
796 "monitor_test-test.py".to_string(), (
798 ScriptLanguage::Python,
799 r#"
800import sys
801import json
802
803input_data = sys.stdin.read()
804data = json.loads(input_data)
805print(False)
806 "#
807 .to_string(),
808 ),
809 );
810
811 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
813 assert!(filtered.is_empty());
814 }
815
816 #[tokio::test]
817 async fn test_run_trigger_filters_true_condition() {
818 let script_content = r#"
819import sys
820import json
821
822input_json = sys.argv[1]
823data = json.loads(input_json)
824print("debugging...")
825def test():
826 return True
827result = test()
828print(result)
829"#;
830 let temp_file = create_temp_script(script_content);
831 let mut trigger_scripts = HashMap::new();
832 trigger_scripts.insert(
833 format!("test-{}", temp_file.path().to_str().unwrap()),
834 (ScriptLanguage::Python, script_content.to_string()),
835 );
836 let match_item = create_mock_monitor_match_from_path(
837 BlockChainType::EVM,
838 Some(temp_file.path().to_str().unwrap()),
839 );
840 let matches = vec![match_item.clone()];
841
842 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
843 assert_eq!(filtered.len(), 1);
844 assert!(matches_equal(&filtered[0], &match_item));
845 }
846
847 #[tokio::test]
848 async fn test_run_trigger_filters_false_condition() {
849 let script_content = r#"
850import sys
851import json
852
853input_data = sys.stdin.read()
854data = json.loads(input_data)
855print("debugging...")
856def test():
857 return False
858result = test()
859print(result)
860"#;
861 let temp_file = create_temp_script(script_content);
862 let mut trigger_scripts = HashMap::new();
863 trigger_scripts.insert(
864 format!("test-{}", temp_file.path().to_str().unwrap()),
865 (ScriptLanguage::Python, script_content.to_string()),
866 );
867 let match_item = create_mock_monitor_match_from_path(
868 BlockChainType::EVM,
869 Some(temp_file.path().to_str().unwrap()),
870 );
871 let matches = vec![match_item.clone()];
872
873 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
874 assert_eq!(filtered.len(), 1);
875 }
876
877 #[tokio::test]
878 async fn test_execute_trigger_condition_returns_false() {
879 let script_content = r#"print(False) # Script returns false"#;
880 let temp_file = create_temp_script(script_content);
881 let trigger_condition = TriggerConditions {
882 language: ScriptLanguage::Python,
883 script_path: temp_file.path().to_str().unwrap().to_string(),
884 timeout_ms: 1000,
885 arguments: None,
886 };
887 let match_item = create_mock_monitor_match_from_path(
888 BlockChainType::EVM,
889 Some(temp_file.path().to_str().unwrap()),
890 );
891 let script_content = (ScriptLanguage::Python, script_content.to_string());
892
893 let result =
894 execute_trigger_condition(&trigger_condition, &match_item, &script_content).await;
895 assert!(!result); }
897
898 #[tokio::test]
899 async fn test_execute_trigger_condition_script_error() {
900 let script_content = r#"raise Exception("Test error") # Raise an error"#;
901 let temp_file = create_temp_script(script_content);
902 let trigger_condition = TriggerConditions {
903 language: ScriptLanguage::Python,
904 script_path: temp_file.path().to_str().unwrap().to_string(),
905 timeout_ms: 1000,
906 arguments: None,
907 };
908 let match_item = create_mock_monitor_match_from_path(
909 BlockChainType::EVM,
910 Some(temp_file.path().to_str().unwrap()),
911 );
912 let script_content = (ScriptLanguage::Python, script_content.to_string());
913
914 let result =
915 execute_trigger_condition(&trigger_condition, &match_item, &script_content).await;
916 assert!(!result); }
918
919 #[tokio::test]
920 async fn test_execute_trigger_condition_invalid_script() {
921 let trigger_condition = TriggerConditions {
922 language: ScriptLanguage::Python,
923 script_path: "non_existent_script.py".to_string(),
924 timeout_ms: 1000,
925 arguments: None,
926 };
927 let match_item = create_mock_monitor_match_from_path(
928 BlockChainType::EVM,
929 Some("non_existent_script.py"),
930 );
931 let script_content = (ScriptLanguage::Python, "invalid script content".to_string());
932
933 let result =
934 execute_trigger_condition(&trigger_condition, &match_item, &script_content).await;
935 assert!(!result); }
937
938 #[tokio::test]
939 async fn test_run_trigger_filters_multiple_conditions_keep_match() {
940 let monitor = MonitorBuilder::new()
942 .name("monitor_test")
943 .networks(vec!["ethereum_mainnet".to_string()])
944 .trigger_condition("test1.py", 1000, ScriptLanguage::Python, None)
945 .trigger_condition("test2.py", 1000, ScriptLanguage::Python, None)
946 .build();
947
948 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
950
951 let mut trigger_scripts = HashMap::new();
952 trigger_scripts.insert(
953 "monitor_test|test1.py".to_string(),
954 (
955 ScriptLanguage::Python,
956 r#"
957import sys
958import json
959
960input_data = sys.stdin.read()
961data = json.loads(input_data)
962print(True)
963"#
964 .to_string(),
965 ),
966 );
967 trigger_scripts.insert(
968 "monitor_test|test2.py".to_string(),
969 (
970 ScriptLanguage::Python,
971 r#"
972import sys
973import json
974input_data = sys.stdin.read()
975data = json.loads(input_data)
976print(True)
977"#
978 .to_string(),
979 ),
980 );
981
982 let matches = vec![match_item.clone()];
984 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
985
986 assert_eq!(filtered.len(), 0);
987 }
988
989 #[tokio::test]
990 async fn test_run_trigger_filters_condition_two_combinations_exclude_match() {
991 let monitor = MonitorBuilder::new()
992 .name("monitor_test")
993 .networks(vec!["ethereum_mainnet".to_string()])
994 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
995 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
996 .build();
997
998 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
999
1000 let mut trigger_scripts = HashMap::new();
1002 trigger_scripts.insert(
1003 "monitor_test|condition1.py".to_string(),
1004 (ScriptLanguage::Python, "print(True)".to_string()),
1005 );
1006 trigger_scripts.insert(
1007 "monitor_test|condition2.py".to_string(),
1008 (ScriptLanguage::Python, "print(False)".to_string()),
1009 );
1010
1011 let matches = vec![match_item.clone()];
1012 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1013 assert_eq!(filtered.len(), 0);
1014 }
1015
1016 #[tokio::test]
1017 async fn test_run_trigger_filters_condition_two_combinations_keep_match() {
1018 let monitor = MonitorBuilder::new()
1019 .name("monitor_test")
1020 .networks(vec!["ethereum_mainnet".to_string()])
1021 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1022 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1023 .build();
1024
1025 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1026
1027 let mut trigger_scripts = HashMap::new();
1028 trigger_scripts.insert(
1029 "monitor_test|condition1.py".to_string(),
1030 (ScriptLanguage::Python, "print(False)".to_string()),
1031 );
1032 trigger_scripts.insert(
1033 "monitor_test|condition2.py".to_string(),
1034 (ScriptLanguage::Python, "print(False)".to_string()),
1035 );
1036
1037 let matches = vec![match_item.clone()];
1038 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1039 assert_eq!(filtered.len(), 1);
1040 }
1041
1042 #[tokio::test]
1043 async fn test_run_trigger_filters_condition_two_combinations_exclude_match_last_condition() {
1044 let monitor = MonitorBuilder::new()
1045 .name("monitor_test")
1046 .networks(vec!["ethereum_mainnet".to_string()])
1047 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1048 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1049 .build();
1050
1051 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1052
1053 let mut trigger_scripts = HashMap::new();
1054 trigger_scripts.insert(
1055 "monitor_test|condition1.py".to_string(),
1056 (ScriptLanguage::Python, "print(False)".to_string()),
1057 );
1058 trigger_scripts.insert(
1059 "monitor_test|condition2.py".to_string(),
1060 (ScriptLanguage::Python, "print(True)".to_string()),
1061 );
1062
1063 let matches = vec![match_item.clone()];
1064 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1065 assert_eq!(filtered.len(), 0);
1066 }
1067
1068 #[tokio::test]
1069 async fn test_run_trigger_filters_condition_three_combinations_exclude_match() {
1070 let monitor = MonitorBuilder::new()
1071 .name("monitor_test")
1072 .networks(vec!["ethereum_mainnet".to_string()])
1073 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1074 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1075 .trigger_condition("condition3.py", 1000, ScriptLanguage::Python, None)
1076 .build();
1077
1078 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1079
1080 let mut trigger_scripts = HashMap::new();
1081 trigger_scripts.insert(
1082 "monitor_test|condition1.py".to_string(),
1083 (ScriptLanguage::Python, "print(False)".to_string()),
1084 );
1085 trigger_scripts.insert(
1086 "monitor_test|condition2.py".to_string(),
1087 (ScriptLanguage::Python, "print(False)".to_string()),
1088 );
1089 trigger_scripts.insert(
1090 "monitor_test|condition3.py".to_string(),
1091 (ScriptLanguage::Python, "print(True)".to_string()),
1092 );
1093
1094 let matches = vec![match_item.clone()];
1095 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1096 assert_eq!(filtered.len(), 0);
1097 }
1098
1099 #[tokio::test]
1100 async fn test_run_trigger_filters_condition_three_combinations_keep_match() {
1101 let monitor = MonitorBuilder::new()
1102 .name("monitor_test")
1103 .networks(vec!["ethereum_mainnet".to_string()])
1104 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1105 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1106 .trigger_condition("condition3.py", 1000, ScriptLanguage::Python, None)
1107 .build();
1108
1109 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1110
1111 let mut trigger_scripts = HashMap::new();
1112 trigger_scripts.insert(
1113 "monitor_test|condition1.py".to_string(),
1114 (ScriptLanguage::Python, "print(False)".to_string()),
1115 );
1116 trigger_scripts.insert(
1117 "monitor_test|condition2.py".to_string(),
1118 (ScriptLanguage::Python, "print(False)".to_string()),
1119 );
1120 trigger_scripts.insert(
1121 "monitor_test|condition3.py".to_string(),
1122 (ScriptLanguage::Python, "print(False)".to_string()),
1123 );
1124
1125 let matches = vec![match_item.clone()];
1126 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1127 assert_eq!(filtered.len(), 1);
1128 }
1129
1130 #[tokio::test]
1132 async fn test_run_trigger_filters_stellar_empty_matches() {
1133 let matches: Vec<MonitorMatch> = vec![];
1134 let mut trigger_scripts = HashMap::new();
1135 trigger_scripts.insert(
1136 "monitor_test|test.py".to_string(),
1137 (
1138 ScriptLanguage::Python,
1139 r#"
1140import sys
1141import json
1142
1143input_data = sys.stdin.read()
1144data = json.loads(input_data)
1145print(False)
1146"#
1147 .to_string(),
1148 ),
1149 );
1150
1151 let filtered = run_trigger_filters(&matches, "stellar_mainnet", &trigger_scripts).await;
1152 assert!(filtered.is_empty());
1153 }
1154
1155 #[tokio::test]
1156 async fn test_run_trigger_filters_stellar_true_condition() {
1157 let script_content = r#"
1158import sys
1159import json
1160
1161input_json = sys.argv[1]
1162data = json.loads(input_json)
1163print("debugging...")
1164def test():
1165 return True
1166result = test()
1167print(result)
1168"#;
1169 let temp_file = create_temp_script(script_content);
1170 let mut trigger_scripts = HashMap::new();
1171 trigger_scripts.insert(
1172 format!("test|{}", temp_file.path().to_str().unwrap()),
1173 (ScriptLanguage::Python, script_content.to_string()),
1174 );
1175 let match_item = create_mock_monitor_match_from_path(
1176 BlockChainType::Stellar,
1177 Some(temp_file.path().to_str().unwrap()),
1178 );
1179 let matches = vec![match_item.clone()];
1180
1181 let filtered = run_trigger_filters(&matches, "stellar_mainnet", &trigger_scripts).await;
1182 assert_eq!(filtered.len(), 1);
1183 assert!(matches_equal(&filtered[0], &match_item));
1184 }
1185
1186 #[tokio::test]
1187 async fn test_run_trigger_filters_stellar_multiple_conditions() {
1188 let monitor = MonitorBuilder::new()
1189 .name("monitor_test")
1190 .networks(vec!["stellar_mainnet".to_string()])
1191 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1192 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1193 .build();
1194
1195 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::Stellar, monitor);
1196
1197 let mut trigger_scripts = HashMap::new();
1198 trigger_scripts.insert(
1199 "monitor_test|condition1.py".to_string(),
1200 (ScriptLanguage::Python, "print(False)".to_string()),
1201 );
1202 trigger_scripts.insert(
1203 "monitor_test|condition2.py".to_string(),
1204 (ScriptLanguage::Python, "print(True)".to_string()),
1205 );
1206
1207 let matches = vec![match_item.clone()];
1208 let filtered = run_trigger_filters(&matches, "stellar_mainnet", &trigger_scripts).await;
1209 assert_eq!(filtered.len(), 0); }
1211}