openzeppelin_monitor/services/blockchain/clients/midnight/
client.rs1use anyhow::Context;
7use async_trait::async_trait;
8use futures;
9use serde_json::json;
10use std::marker::PhantomData;
11use std::str::FromStr;
12use subxt::client::OnlineClient;
13use tracing::instrument;
14
15use crate::{
16 models::{BlockType, MidnightBlock, MidnightEvent, Network},
17 services::{
18 blockchain::{
19 client::BlockChainClient, transports::BlockchainTransport, BlockFilterFactory,
20 MidnightWsTransportClient,
21 },
22 filter::MidnightBlockFilter,
23 },
24};
25
26#[derive(Clone)]
35pub struct MidnightClient<
36 W: Send + Sync + Clone,
37 S: SubstrateClientTrait = OnlineClient<subxt::SubstrateConfig>,
38> {
39 ws_client: W,
41 substrate_client: S,
43}
44
45impl<W: Send + Sync + Clone, S: SubstrateClientTrait> MidnightClient<W, S> {
46 pub fn new_with_transport(ws_client: W, substrate_client: S) -> Self {
55 Self {
56 ws_client,
57 substrate_client,
58 }
59 }
60}
61
62impl MidnightClient<MidnightWsTransportClient, OnlineClient<subxt::SubstrateConfig>> {
63 pub async fn new(network: &Network) -> Result<Self, anyhow::Error> {
74 let ws_client = MidnightWsTransportClient::new(network, None).await?;
75 let substrate_client = OnlineClient::<subxt::SubstrateConfig>::from_insecure_url(
76 ws_client.get_current_url().await,
77 )
78 .await
79 .map_err(|e| anyhow::anyhow!("Failed to create subxt client: {}", e))?;
80 Ok(Self::new_with_transport(ws_client, substrate_client))
81 }
82}
83
84#[async_trait]
85impl<W: Send + Sync + Clone + BlockchainTransport> BlockFilterFactory<Self> for MidnightClient<W> {
86 type Filter = MidnightBlockFilter<Self>;
87 fn filter() -> Self::Filter {
88 MidnightBlockFilter {
89 _client: PhantomData,
90 }
91 }
92}
93
94#[async_trait]
99pub trait SubstrateClientTrait: Send + Sync + Clone {
100 async fn get_events_at(
108 &self,
109 block_hash: subxt::utils::H256,
110 ) -> Result<subxt::events::Events<subxt::SubstrateConfig>, subxt::Error>;
111
112 async fn get_finalized_block(
113 &self,
114 ) -> Result<
115 subxt::blocks::Block<subxt::SubstrateConfig, OnlineClient<subxt::SubstrateConfig>>,
116 subxt::Error,
117 >;
118}
119
120#[async_trait]
125impl SubstrateClientTrait for OnlineClient<subxt::SubstrateConfig> {
126 async fn get_events_at(
127 &self,
128 block_hash: subxt::utils::H256,
129 ) -> Result<subxt::events::Events<subxt::SubstrateConfig>, subxt::Error> {
130 self.events().at(block_hash).await
131 }
132
133 async fn get_finalized_block(
134 &self,
135 ) -> Result<
136 subxt::blocks::Block<subxt::SubstrateConfig, OnlineClient<subxt::SubstrateConfig>>,
137 subxt::Error,
138 > {
139 self.blocks().at_latest().await
140 }
141}
142
143#[async_trait]
148pub trait MidnightClientTrait {
149 async fn get_events(
161 &self,
162 start_block: u64,
163 end_block: Option<u64>,
164 ) -> Result<Vec<MidnightEvent>, anyhow::Error>;
165
166 async fn get_chain_type(&self) -> Result<String, anyhow::Error>;
174}
175
176#[async_trait]
177impl<W: Send + Sync + Clone + BlockchainTransport, S: SubstrateClientTrait> MidnightClientTrait
178 for MidnightClient<W, S>
179{
180 #[instrument(skip(self), fields(start_block, end_block))]
183 async fn get_events(
184 &self,
185 start_block: u64,
186 end_block: Option<u64>,
187 ) -> Result<Vec<MidnightEvent>, anyhow::Error> {
188 let end_block = end_block.unwrap_or(start_block);
189 if start_block > end_block {
190 return Err(anyhow::anyhow!(
191 "start_block {} cannot be greater than end_block {}",
192 start_block,
193 end_block
194 ));
195 }
196 let block_range = start_block..=end_block;
197
198 let block_hashes = futures::future::join_all(block_range.clone().map(|block_number| {
200 let client = self.ws_client.clone();
201 async move {
202 let params = json!([format!("0x{:x}", block_number)]);
203 let response = client
204 .send_raw_request("chain_getBlockHash", Some(params))
205 .await
206 .with_context(|| format!("Failed to get block hash for: {}", block_number))?;
207
208 let hash_str = response
209 .get("result")
210 .and_then(|v| v.as_str())
211 .ok_or_else(|| anyhow::anyhow!("Missing 'result' field"))?;
212
213 subxt::utils::H256::from_str(hash_str)
214 .map_err(|e| anyhow::anyhow!("Failed to parse block hash: {}", e))
215 }
216 }))
217 .await
218 .into_iter()
219 .collect::<Result<Vec<_>, _>>()?;
220
221 let raw_events = futures::future::join_all(block_hashes.into_iter().map(|block_hash| {
223 let client = self.substrate_client.clone();
224 async move {
225 client
226 .get_events_at(block_hash)
227 .await
228 .map_err(|e| anyhow::anyhow!("Failed to get events: {}", e))
229 }
230 }))
231 .await
232 .into_iter()
233 .collect::<Result<Vec<_>, _>>()?;
234
235 let decoded_events =
237 futures::future::join_all(raw_events.into_iter().map(|block_events| {
238 let client = self.ws_client.clone();
239 async move {
240 let event_bytes = block_events.bytes();
241 let params = json!([hex::encode(event_bytes)]);
242 let response = client
243 .send_raw_request("midnight_decodeEvents", Some(params))
244 .await?;
245
246 let response_result = response
247 .get("result")
248 .and_then(|v| v.as_array())
249 .ok_or_else(|| anyhow::anyhow!("Missing 'result' field"))?;
250
251 Ok::<Vec<MidnightEvent>, anyhow::Error>(
252 response_result
253 .iter()
254 .map(|v| MidnightEvent::from(v.clone()))
255 .collect(),
256 )
257 }
258 }))
259 .await
260 .into_iter()
261 .collect::<Result<Vec<_>, _>>()?
262 .into_iter()
263 .flatten()
264 .collect();
265
266 Ok(decoded_events)
267 }
268
269 #[instrument(skip(self))]
271 async fn get_chain_type(&self) -> Result<String, anyhow::Error> {
272 let response = self
273 .ws_client
274 .send_raw_request::<serde_json::Value>("system_chain", None)
275 .await
276 .with_context(|| "Failed to get chain type")?;
277
278 response
279 .get("result")
280 .and_then(|v| v.as_str())
281 .map(|s| s.to_string())
282 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'result' field"))
283 }
284}
285
286#[async_trait]
287impl<W: Send + Sync + Clone + BlockchainTransport, S: SubstrateClientTrait> BlockChainClient
288 for MidnightClient<W, S>
289{
290 #[instrument(skip(self))]
299 async fn get_latest_block_number(&self) -> Result<u64, anyhow::Error> {
300 let response = self
302 .ws_client
303 .send_raw_request::<serde_json::Value>("chain_getFinalisedHead", None)
304 .await
305 .with_context(|| "Failed to get latest block number")?;
306
307 let finalised_block_hash = response
308 .get("result")
309 .and_then(|v| v.as_str())
310 .ok_or_else(|| anyhow::anyhow!("Missing 'result' field"))?;
311
312 let params = json!([finalised_block_hash]);
313
314 let response = self
315 .ws_client
316 .send_raw_request::<serde_json::Value>("chain_getHeader", Some(params))
317 .await
318 .with_context(|| "Failed to get latest block number")?;
319
320 let hex_str = response
322 .get("result")
323 .and_then(|v| v.get("number"))
324 .and_then(|v| v.as_str())
325 .ok_or_else(|| anyhow::anyhow!("Missing block number in response"))?;
326
327 u64::from_str_radix(hex_str.trim_start_matches("0x"), 16)
329 .map_err(|e| anyhow::anyhow!("Failed to parse block number: {}", e))
330 }
331
332 #[instrument(skip(self), fields(start_block, end_block))]
344 async fn get_blocks(
345 &self,
346 start_block: u64,
347 end_block: Option<u64>,
348 ) -> Result<Vec<BlockType>, anyhow::Error> {
349 let end_block = end_block.unwrap_or(start_block);
350 if start_block > end_block {
351 return Err(anyhow::anyhow!(
352 "start_block {} cannot be greater than end_block {}",
353 start_block,
354 end_block
355 ));
356 }
357
358 let block_futures: Vec<_> = (start_block..=end_block)
359 .map(|block_number| {
360 let params = json!([format!("0x{:x}", block_number)]);
361 let client = self.ws_client.clone();
362
363 async move {
364 let response = client
365 .send_raw_request("chain_getBlockHash", Some(params))
366 .await
367 .with_context(|| {
368 format!("Failed to get block hash for: {}", block_number)
369 })?;
370
371 let block_hash = response
372 .get("result")
373 .and_then(|v| v.as_str())
374 .ok_or_else(|| anyhow::anyhow!("Missing 'result' field"))?;
375
376 let params = json!([block_hash]);
377
378 let response = client
379 .send_raw_request("midnight_jsonBlock", Some(params))
380 .await
381 .with_context(|| format!("Failed to get block: {}", block_number))?;
382
383 let block_data = response
384 .get("result")
385 .ok_or_else(|| anyhow::anyhow!("Missing 'result' field"))?;
386
387 let block_value: serde_json::Value = serde_json::from_str(
389 block_data
390 .as_str()
391 .ok_or_else(|| anyhow::anyhow!("Result is not a string"))?,
392 )
393 .with_context(|| "Failed to parse block JSON string")?;
394
395 if block_value.is_null() {
396 return Err(anyhow::anyhow!("Block not found"));
397 }
398
399 let block: MidnightBlock = serde_json::from_value(block_value.clone())
400 .map_err(|e| anyhow::anyhow!("Failed to parse block: {}", e))?;
401
402 Ok(BlockType::Midnight(Box::new(block)))
403 }
404 })
405 .collect();
406
407 futures::future::join_all(block_futures)
408 .await
409 .into_iter()
410 .collect::<Result<Vec<_>, _>>()
411 }
412}