1#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9pub mod bindings {
18 include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
19}
20
21use bindings::*;
22use std::cell::Cell;
23use std::os::raw::c_int;
24use std::time::{Duration, Instant};
25
26pub mod testing;
27
28include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
29include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
30
31pub type SourceLocation = bindings::aeron_archive_source_location_t;
32pub const SOURCE_LOCATION_LOCAL: aeron_archive_source_location_en =
33 SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_LOCAL;
34pub const SOURCE_LOCATION_REMOTE: aeron_archive_source_location_en =
35 SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_REMOTE;
36
37pub struct NoOpAeronIdleStrategyFunc;
38
39impl AeronIdleStrategyFuncCallback for NoOpAeronIdleStrategyFunc {
40 fn handle_aeron_idle_strategy_func(&mut self, _work_count: c_int) -> () {}
41}
42
43pub struct RecordingPos;
44impl RecordingPos {
45 pub fn find_counter_id_by_session(
46 counter_reader: &AeronCountersReader,
47 session_id: i32,
48 ) -> i32 {
49 unsafe {
50 aeron_archive_recording_pos_find_counter_id_by_session_id(
51 counter_reader.get_inner(),
52 session_id,
53 )
54 }
55 }
56 pub fn find_counter_id_by_recording_id(
57 counter_reader: &AeronCountersReader,
58 recording_id: i64,
59 ) -> i32 {
60 unsafe {
61 aeron_archive_recording_pos_find_counter_id_by_recording_id(
62 counter_reader.get_inner(),
63 recording_id,
64 )
65 }
66 }
67
68 pub fn get_recording_id_block(
71 counters_reader: &AeronCountersReader,
72 counter_id: i32,
73 wait: Duration,
74 ) -> Result<i64, AeronCError> {
75 let mut result = Self::get_recording_id(counters_reader, counter_id);
76 let instant = Instant::now();
77
78 while result.is_err() && instant.elapsed() < wait {
79 result = Self::get_recording_id(counters_reader, counter_id);
80 #[cfg(debug_assertions)]
81 std::thread::sleep(Duration::from_millis(10));
82 }
83
84 return result;
85 }
86
87 pub fn get_recording_id(
90 counters_reader: &AeronCountersReader,
91 counter_id: i32,
92 ) -> Result<i64, AeronCError> {
93 pub const RECORDING_POSITION_TYPE_ID: i32 = 100;
96
97 pub const RECORD_ALLOCATED: i32 = 1;
99
100 pub const NULL_RECORDING_ID: i64 = -1;
102
103 if counter_id < 0 {
104 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
105 }
106
107 let state = counters_reader.counter_state(counter_id)?;
108 if state != RECORD_ALLOCATED {
109 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
110 }
111
112 let type_id = counters_reader.counter_type_id(counter_id)?;
113 if type_id != RECORDING_POSITION_TYPE_ID {
114 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
115 }
116
117 let recording_id = Cell::new(-1);
123 counters_reader.foreach_counter_once(|value, id, type_id, key, label| {
124 if id == counter_id && type_id == RECORDING_POSITION_TYPE_ID {
125 let mut val = [0u8; 8];
126 val.copy_from_slice(&key[0..8]);
127 let Ok(value) = i64::from_le_bytes(val).try_into();
128 recording_id.set(value);
129 }
130 });
131 let recording_id = recording_id.get();
132 if recording_id < 0 {
133 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
134 }
135
136 Ok(recording_id)
137 }
138}
139
140unsafe extern "C" fn default_encoded_credentials(
141 _clientd: *mut std::os::raw::c_void,
142) -> *mut aeron_archive_encoded_credentials_t {
143 let empty_credentials = Box::new(aeron_archive_encoded_credentials_t {
145 data: std::ptr::null(),
146 length: 0,
147 });
148 Box::into_raw(empty_credentials)
149}
150
151impl AeronArchive {
152 pub fn aeron(&self) -> Aeron {
153 self.get_archive_context().get_aeron()
154 }
155}
156
157impl AeronArchiveAsyncConnect {
158 #[inline]
159 pub fn new_with_aeron(ctx: &AeronArchiveContext, aeron: &Aeron) -> Result<Self, AeronCError> {
161 let resource_async = Self::new(ctx)?;
162 resource_async.inner.add_dependency(aeron.clone());
163 Ok(resource_async)
164 }
165}
166
167macro_rules! impl_archive_position_methods {
168 ($pub_type:ty) => {
169 impl $pub_type {
170 pub fn get_archive_position(&self) -> Result<i64, AeronCError> {
173 if let Some(aeron) = self.inner.get_dependency::<Aeron>() {
174 let counter_reader = &aeron.counters_reader();
175 self.get_archive_position_with(counter_reader)
176 } else {
177 Err(AeronCError::from_code(-1))
178 }
179 }
180
181 pub fn get_archive_position_with(
184 &self,
185 counters: &AeronCountersReader,
186 ) -> Result<i64, AeronCError> {
187 let session_id = self.get_constants()?.session_id();
188 let counter_id = RecordingPos::find_counter_id_by_session(counters, session_id);
189 if counter_id < 0 {
190 return Err(AeronCError::from_code(counter_id));
191 }
192 let position = counters.get_counter_value(counter_id);
193 if position < 0 {
194 return Err(AeronCError::from_code(position as i32));
195 }
196 Ok(position)
197 }
198
199 pub fn is_archive_position_with(&self, length_inclusive: usize) -> bool {
202 let archive_position = self.get_archive_position().unwrap_or(-1);
203 if archive_position < 0 {
204 return false;
205 }
206 self.position() - archive_position <= length_inclusive as i64
207 }
208 }
209 };
210}
211
212impl_archive_position_methods!(AeronPublication);
213impl_archive_position_methods!(AeronExclusivePublication);
214
215impl AeronArchiveContext {
216 pub fn set_no_credentials_supplier(&self) -> Result<i32, AeronCError> {
220 self.set_credentials_supplier(
221 Some(default_encoded_credentials),
222 None,
223 None::<&Handler<AeronArchiveCredentialsFreeFuncLogger>>,
224 )
225 }
226
227 pub fn new_with_no_credentials_supplier(
231 aeron: &Aeron,
232 request_control_channel: &str,
233 response_control_channel: &str,
234 recording_events_channel: &str,
235 ) -> Result<AeronArchiveContext, AeronCError> {
236 let context = Self::new()?;
237 context.set_no_credentials_supplier()?;
238 context.set_aeron(aeron)?;
239 context.set_control_request_channel(&request_control_channel.into_c_string())?;
240 context.set_control_response_channel(&response_control_channel.into_c_string())?;
241 context.set_recording_events_channel(&recording_events_channel.into_c_string())?;
242 context.set_idle_strategy(Some(&Handler::leak(NoOpAeronIdleStrategyFunc)))?;
244 Ok(context)
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251 use log::{error, info};
252
253 use crate::testing::EmbeddedArchiveMediaDriverProcess;
254 use serial_test::serial;
255 use std::cell::Cell;
256 use std::error;
257 use std::error::Error;
258 use std::str::FromStr;
259 use std::sync::atomic::{AtomicBool, Ordering};
260 use std::sync::Arc;
261 use std::thread::{sleep, JoinHandle};
262 use std::time::{Duration, Instant};
263
264 #[derive(Default, Debug)]
265 struct ErrorCount {
266 error_count: usize,
267 }
268
269 impl AeronErrorHandlerCallback for ErrorCount {
270 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
271 error!("Aeron error {}: {}", error_code, msg);
272 self.error_count += 1;
273 }
274 }
275
276 pub const ARCHIVE_CONTROL_REQUEST: &str = "aeron:udp?endpoint=localhost:8010";
277 pub const ARCHIVE_CONTROL_RESPONSE: &str = "aeron:udp?endpoint=localhost:8011";
278 pub const ARCHIVE_RECORDING_EVENTS: &str =
279 "aeron:udp?control-mode=dynamic|control=localhost:8012";
280
281 #[test]
282 fn test_uri_string_builder() -> Result<(), AeronCError> {
283 let builder = AeronUriStringBuilder::default();
284
285 builder.init_new()?;
286 builder
287 .media(Media::Udp)? .mtu_length(1024 * 64)?
289 .set_initial_position(127424949617280, 1182294755, 65536)?;
290 let uri = builder.build(1024)?;
291 assert_eq!("aeron:udp?term-id=-1168322114|term-length=65536|mtu=65536|init-term-id=1182294755|term-offset=33408", uri);
292
293 builder.init_new()?;
294 let uri = builder
295 .media(Media::Udp)?
296 .control_mode(ControlMode::Dynamic)?
297 .reliable(false)?
298 .ttl(2)?
299 .endpoint("localhost:1235")?
300 .control("localhost:1234")?
301 .build(1024)?;
302 assert_eq!("aeron:udp?ttl=2|control-mode=dynamic|endpoint=localhost:1235|control=localhost:1234|reliable=false", uri);
303
304 let uri = AeronUriStringBuilder::from_str("aeron:udp?endpoint=localhost:8010")?
305 .ttl(5)?
306 .build(1024)?;
307
308 assert_eq!("aeron:udp?ttl=5|endpoint=localhost:8010", uri);
309
310 let uri = uri.parse::<AeronUriStringBuilder>()?.ttl(6)?.build(1024)?;
311
312 assert_eq!("aeron:udp?ttl=6|endpoint=localhost:8010", uri);
313
314 Ok(())
315 }
316
317 pub const STREAM_ID: i32 = 1033;
318 pub const MESSAGE_PREFIX: &str = "Message-Prefix-";
319 pub const CONTROL_ENDPOINT: &str = "localhost:23265";
320 pub const RECORDING_ENDPOINT: &str = "localhost:23266";
321 pub const LIVE_ENDPOINT: &str = "localhost:23267";
322 pub const REPLAY_ENDPOINT: &str = "localhost:0";
323 #[test]
326 #[serial]
327 fn test_simple_replay_merge() -> Result<(), AeronCError> {
328 let _ = env_logger::Builder::new()
329 .is_test(true)
330 .filter_level(log::LevelFilter::Info)
331 .try_init();
332
333 EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
334 .expect("failed to kill all java processes");
335
336 assert!(is_udp_port_available(23265));
337 assert!(is_udp_port_available(23266));
338 assert!(is_udp_port_available(23267));
339 assert!(is_udp_port_available(23268));
340 let id = Aeron::nano_clock();
341 let aeron_dir = format!("target/aeron/{}/shm", id);
342 let archive_dir = format!("target/aeron/{}/archive", id);
343
344 info!("starting archive media driver");
345 let media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
346 &aeron_dir,
347 &format!("{}/archive", aeron_dir),
348 ARCHIVE_CONTROL_REQUEST,
349 ARCHIVE_CONTROL_RESPONSE,
350 ARCHIVE_RECORDING_EVENTS,
351 )
352 .expect("Failed to start embedded media driver");
353
354 info!("connecting to archive");
355 let (archive, aeron) = media_driver
356 .archive_connect()
357 .expect("Could not connect to archive client");
358
359 let running = Arc::new(AtomicBool::new(true));
360
361 info!("connected to archive, adding publication");
362 assert!(!aeron.is_closed());
363
364 let (session_id, publisher_thread) =
365 reply_merge_publisher(&archive, aeron.clone(), running.clone())?;
366
367 {
368 let context = AeronContext::new()?;
369 context.set_dir(&media_driver.aeron_dir)?;
370 let error_handler = Handler::leak(ErrorCount::default());
371 context.set_error_handler(Some(&error_handler))?;
372 let aeron = Aeron::new(&context)?;
373 aeron.start()?;
374 let aeron_archive_context = archive.get_archive_context();
375 let aeron_archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
376 &aeron,
377 aeron_archive_context.get_control_request_channel(),
378 aeron_archive_context.get_control_response_channel(),
379 aeron_archive_context.get_recording_events_channel(),
380 )?;
381 aeron_archive_context.set_error_handler(Some(&error_handler))?;
382 let archive = AeronArchiveAsyncConnect::new_with_aeron(&aeron_archive_context, &aeron)?
383 .poll_blocking(Duration::from_secs(30))
384 .expect("failed to connect to archive");
385 replay_merge_subscription(&archive, aeron.clone(), session_id)?;
386 }
387
388 running.store(false, Ordering::Release);
389 publisher_thread.join().unwrap();
390
391 Ok(())
392 }
393
394 fn reply_merge_publisher(
395 archive: &AeronArchive,
396 aeron: Aeron,
397 running: Arc<AtomicBool>,
398 ) -> Result<(i32, JoinHandle<()>), AeronCError> {
399 let publication = aeron.add_publication(
400 &format!("aeron:udp?control={CONTROL_ENDPOINT}|control-mode=dynamic|term-length=65536")
402 .into_c_string(),
403 STREAM_ID,
404 Duration::from_secs(5),
405 )?;
406
407 info!(
408 "publication {} [status={:?}]",
409 publication.channel(),
410 publication.channel_status()
411 );
412 assert_eq!(1, publication.channel_status());
413
414 let session_id = publication.session_id();
415 let recording_channel = format!(
416 "aeron:udp?endpoint={RECORDING_ENDPOINT}|control={CONTROL_ENDPOINT}|session-id={session_id}"
418 );
419 info!("recording channel {}", recording_channel);
420 archive.start_recording(
421 &recording_channel.into_c_string(),
422 STREAM_ID,
423 SOURCE_LOCATION_REMOTE,
424 true,
425 )?;
426
427 info!("waiting for publisher to be connected");
428 while !publication.is_connected() {
429 thread::sleep(Duration::from_millis(100));
430 }
431 info!("publisher to be connected");
432 let counters_reader = aeron.counters_reader();
433 let mut caught_up_count = 0;
434 let publisher_thread = thread::spawn(move || {
435 let mut message_count = 0;
436
437 while running.load(Ordering::Acquire) {
438 let message = format!("{}{}", MESSAGE_PREFIX, message_count);
439 while publication.offer(
440 message.as_bytes(),
441 Handlers::no_reserved_value_supplier_handler(),
442 ) <= 0
443 {
444 thread::sleep(Duration::from_millis(10));
445 }
446 message_count += 1;
447 if message_count % 10_000 == 0 {
448 info!(
449 "Published {} messages [position={}]",
450 message_count,
451 publication.position()
452 );
453 }
454 if message_count > 10_000 {
456 while !publication.is_archive_position_with(0) {
458 thread::sleep(Duration::from_micros(300));
459 }
460 caught_up_count += 1;
461 }
462 }
463 assert!(caught_up_count > 0);
464 info!("Publisher thread terminated");
465 });
466 Ok((session_id, publisher_thread))
467 }
468
469 fn replay_merge_subscription(
470 archive: &AeronArchive,
471 aeron: Aeron,
472 session_id: i32,
473 ) -> Result<(), AeronCError> {
474 let replay_channel = format!("aeron:udp?session-id={session_id}").into_c_string();
476 info!("replay channel {:?}", replay_channel);
477
478 let replay_destination = format!("aeron:udp?endpoint={REPLAY_ENDPOINT}").into_c_string();
479 info!("replay destination {:?}", replay_destination);
480
481 let live_destination =
482 format!("aeron:udp?endpoint={LIVE_ENDPOINT}|control={CONTROL_ENDPOINT}")
483 .into_c_string();
484 info!("live destination {:?}", live_destination);
485
486 let counters_reader = aeron.counters_reader();
487 let mut counter_id = -1;
488
489 while counter_id < 0 {
490 counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
491 }
492 info!(
493 "counter id {} {:?}",
494 counter_id,
495 counters_reader.get_counter_label(counter_id, 1024)
496 );
497 info!(
498 "counter id {} position={:?}",
499 counter_id,
500 counters_reader.get_counter_value(counter_id)
501 );
502
503 let start_position = 0;
525 let recording_id = RecordingPos::get_recording_id_block(
526 &aeron.counters_reader(),
527 counter_id,
528 Duration::from_secs(5),
529 )?;
530
531 let subscribe_channel =
532 format!("aeron:udp?control-mode=manual|session-id={session_id}").into_c_string();
533 info!("subscribe channel {:?}", subscribe_channel);
534 let subscription = aeron.add_subscription(
535 &subscribe_channel,
536 STREAM_ID,
537 Handlers::no_available_image_handler(),
538 Handlers::no_unavailable_image_handler(),
539 Duration::from_secs(5),
540 )?;
541
542 let replay_merge = AeronArchiveReplayMerge::new(
543 &subscription,
544 &archive,
545 &replay_channel,
546 &replay_destination,
547 &live_destination,
548 recording_id,
549 start_position,
550 Aeron::epoch_clock(),
551 10_000,
552 )?;
553
554 info!(
555 "ReplayMerge initialization: recordingId={}, startPosition={}, subscriptionChannel={:?}, replayChannel={:?}, replayDestination={:?}, liveDestination={:?}",
556 recording_id,
557 start_position,
558 subscribe_channel,
559 &replay_channel,
560 &replay_destination,
561 &live_destination
562 );
563
564 info!(
575 "about to start_replay [maxRecordPosition={:?}]",
576 archive.get_max_recorded_position(recording_id)
577 );
578
579 let mut reply_count = 0;
580 while !replay_merge.is_merged() {
581 assert!(!replay_merge.has_failed());
582 if replay_merge.poll_once(
583 |buffer, _header| {
584 reply_count += 1;
585 if reply_count % 10_000 == 0 {
586 info!(
587 "replay-merge [count={}, isMerged={}, isLive={}]",
588 reply_count,
589 replay_merge.is_merged(),
590 replay_merge.is_live_added()
591 );
592 }
593 },
594 100,
595 )? == 0
596 {
597 let err = archive.poll_for_error_response_as_string(4096)?;
598 if !err.is_empty() {
599 panic!("{}", err);
600 }
601 if Aeron::errmsg().len() > 0 && "no error" != Aeron::errmsg() {
602 panic!("{}", Aeron::errmsg());
603 }
604 archive.poll_for_recording_signals()?;
605 thread::sleep(Duration::from_millis(100));
606 }
607 }
608 assert!(!replay_merge.has_failed());
609 assert!(replay_merge.is_live_added());
610 assert!(reply_count > 10_000);
611 Ok(())
612 }
613
614 #[test]
615 fn version_check() {
616 let major = unsafe { crate::aeron_version_major() };
617 let minor = unsafe { crate::aeron_version_minor() };
618 let patch = unsafe { crate::aeron_version_patch() };
619
620 let aeron_version = format!("{}.{}.{}", major, minor, patch);
621
622 let cargo_version = "1.48.5";
623 assert_eq!(aeron_version, cargo_version);
624 }
625
626 use std::thread;
627
628 pub fn start_aeron_archive() -> Result<
629 (
630 Aeron,
631 AeronArchiveContext,
632 EmbeddedArchiveMediaDriverProcess,
633 ),
634 Box<dyn Error>,
635 > {
636 let id = Aeron::nano_clock();
637 let aeron_dir = format!("target/aeron/{}/shm", id);
638 let archive_dir = format!("target/aeron/{}/archive", id);
639
640 let request_port = find_unused_udp_port(8000).expect("Could not find port");
641 let response_port = find_unused_udp_port(request_port + 1).expect("Could not find port");
642 let recording_event_port =
643 find_unused_udp_port(response_port + 1).expect("Could not find port");
644 let request_control_channel = &format!("aeron:udp?endpoint=localhost:{}", request_port);
645 let response_control_channel = &format!("aeron:udp?endpoint=localhost:{}", response_port);
646 let recording_events_channel =
647 &format!("aeron:udp?endpoint=localhost:{}", recording_event_port);
648 assert_ne!(request_control_channel, response_control_channel);
649
650 let archive_media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
651 &aeron_dir,
652 &archive_dir,
653 request_control_channel,
654 response_control_channel,
655 recording_events_channel,
656 )
657 .expect("Failed to start Java process");
658
659 let aeron_context = AeronContext::new()?;
660 aeron_context.set_dir(&aeron_dir.into_c_string())?;
661 aeron_context.set_client_name(&"test".into_c_string())?;
662 aeron_context.set_publication_error_frame_handler(Some(&Handler::leak(
663 AeronPublicationErrorFrameHandlerLogger,
664 )))?;
665 let error_handler = Handler::leak(ErrorCount::default());
666 aeron_context.set_error_handler(Some(&error_handler))?;
667 let aeron = Aeron::new(&aeron_context)?;
668 aeron.start()?;
669
670 let archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
671 &aeron,
672 request_control_channel,
673 response_control_channel,
674 recording_events_channel,
675 )?;
676 archive_context.set_error_handler(Some(&error_handler))?;
677 Ok((aeron, archive_context, archive_media_driver))
678 }
679
680 #[test]
681 #[serial]
682 pub fn test_aeron_archive() -> Result<(), Box<dyn error::Error>> {
683 let _ = env_logger::Builder::new()
684 .is_test(true)
685 .filter_level(log::LevelFilter::Info)
686 .try_init();
687 EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
688 .expect("failed to kill all java processes");
689
690 let (aeron, archive_context, media_driver) = start_aeron_archive()?;
691
692 assert!(!aeron.is_closed());
693
694 info!("connected to aeron");
695
696 let archive_connector =
697 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
698 let archive = archive_connector
699 .poll_blocking(Duration::from_secs(30))
700 .expect("failed to connect to aeron archive media driver");
701
702 assert!(archive.get_archive_id() > 0);
703
704 let channel = AERON_IPC_STREAM;
705 let stream_id = 10;
706
707 let subscription_id =
708 archive.start_recording(channel, stream_id, SOURCE_LOCATION_LOCAL, true)?;
709
710 assert!(subscription_id >= 0);
711 info!("subscription id {}", subscription_id);
712
713 let publication = aeron
714 .async_add_exclusive_publication(channel, stream_id)?
715 .poll_blocking(Duration::from_secs(5))?;
716
717 for i in 0..11 {
718 while publication.offer(
719 "123456".as_bytes(),
720 Handlers::no_reserved_value_supplier_handler(),
721 ) <= 0
722 {
723 sleep(Duration::from_millis(50));
724 archive.poll_for_recording_signals()?;
725 let err = archive.poll_for_error_response_as_string(4096)?;
726 if !err.is_empty() {
727 panic!("{}", err);
728 }
729 archive.idle();
730 }
731 info!("sent message {i} [test_aeron_archive]");
732 }
733
734 archive.idle();
735 let session_id = publication.get_constants()?.session_id;
736 info!("publication session id {}", session_id);
737 let stop_position = publication.position();
739 info!(
740 "publication stop position {} [publication={:?}]",
741 stop_position,
742 publication.get_constants()
743 );
744 let counters_reader = aeron.counters_reader();
745 info!("counters reader ready {:?}", counters_reader);
746
747 let mut counter_id = -1;
748
749 let start = Instant::now();
750 while counter_id <= 0 && start.elapsed() < Duration::from_secs(5) {
751 counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
752 info!("counter id {}", counter_id);
753 }
754
755 assert!(counter_id >= 0);
756
757 info!("counter id {counter_id}, session id {session_id}");
758 while counters_reader.get_counter_value(counter_id) < stop_position {
759 info!(
760 "current archive publication stop position {}",
761 counters_reader.get_counter_value(counter_id)
762 );
763 sleep(Duration::from_millis(50));
764 }
765 info!(
766 "found archive publication stop position {}",
767 counters_reader.get_counter_value(counter_id)
768 );
769
770 archive.stop_recording_channel_and_stream(channel, stream_id)?;
771 drop(publication);
772
773 info!("list recordings");
774 let found_recording_id = Cell::new(-1);
775 let start_pos = Cell::new(-1);
776 let end_pos = Cell::new(-1);
777 let start = Instant::now();
778 while start.elapsed() < Duration::from_secs(5) && found_recording_id.get() == -1 {
779 let mut count = 0;
780 archive.list_recordings_for_uri_once(
781 &mut count,
782 0,
783 i32::MAX,
784 channel,
785 stream_id,
786 |d: AeronArchiveRecordingDescriptor| {
787 assert_eq!(d.stream_id, stream_id);
788 info!("found recording {:#?}", d);
789 info!(
790 "strippedChannel={}, originalChannel={}",
791 d.stripped_channel(),
792 d.original_channel()
793 );
794 if d.stop_position > d.start_position && d.stop_position > 0 {
795 found_recording_id.set(d.recording_id);
796 start_pos.set(d.start_position);
797 end_pos.set(d.stop_position);
798 }
799
800 let copy = d.clone_struct();
802 assert_eq!(copy.deref(), d.deref());
803 assert_eq!(copy.recording_id, d.recording_id);
804 assert_eq!(copy.control_session_id, d.control_session_id);
805 assert_eq!(copy.mtu_length, d.mtu_length);
806 assert_eq!(copy.source_identity_length, d.source_identity_length);
807 },
808 )?;
809 archive.poll_for_recording_signals()?;
810 let err = archive.poll_for_error_response_as_string(4096)?;
811 if !err.is_empty() {
812 panic!("{}", err);
813 }
814 }
815 assert!(start.elapsed() < Duration::from_secs(5));
816 info!("start replay");
817 let params = AeronArchiveReplayParams::new(
818 0,
819 i32::MAX,
820 start_pos.get(),
821 end_pos.get() - start_pos.get(),
822 0,
823 0,
824 )?;
825 info!("replay params {:#?}", params);
826 let replay_stream_id = 45;
827 let replay_session_id =
828 archive.start_replay(found_recording_id.get(), channel, replay_stream_id, ¶ms)?;
829 let session_id = replay_session_id as i32;
830
831 info!("replay session id {}", replay_session_id);
832 info!("session id {}", session_id);
833 let channel_replay =
834 format!("{}?session-id={}", channel.to_str().unwrap(), session_id).into_c_string();
835 info!("archive id: {}", archive.get_archive_id());
836
837 info!("add subscription {:?}", channel_replay);
838 let subscription = aeron
839 .async_add_subscription(
840 &channel_replay,
841 replay_stream_id,
842 Some(&Handler::leak(AeronAvailableImageLogger)),
843 Some(&Handler::leak(AeronUnavailableImageLogger)),
844 )?
845 .poll_blocking(Duration::from_secs(10))?;
846
847 #[derive(Default)]
848 struct FragmentHandler {
849 count: Cell<usize>,
850 }
851
852 impl AeronFragmentHandlerCallback for FragmentHandler {
853 fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], _header: AeronHeader) {
854 assert_eq!(buffer, "123456".as_bytes());
855
856 self.count.set(self.count.get() + 1);
858 }
859 }
860
861 let poll = Handler::leak(FragmentHandler::default());
862
863 let start = Instant::now();
864 while start.elapsed() < Duration::from_secs(10) && subscription.poll(Some(&poll), 100)? <= 0
865 {
866 let err = archive.poll_for_error_response_as_string(4096)?;
867 if !err.is_empty() {
868 panic!("{}", err);
869 }
870 }
871 assert!(
872 start.elapsed() < Duration::from_secs(10),
873 "messages not received {:?}",
874 poll.count
875 );
876 info!("aeron {:?}", aeron);
877 info!("ctx {:?}", archive_context);
878 assert_eq!(11, poll.count.get());
879 Ok(())
880 }
881
882 #[test]
883 #[serial]
884 fn test_invalid_recording_channel() -> Result<(), Box<dyn Error>> {
885 let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
886 let archive_connector =
887 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
888 let archive = archive_connector
889 .poll_blocking(Duration::from_secs(30))
890 .expect("failed to connect to archive");
891
892 let invalid_channel = "invalid:channel".into_c_string();
893 let result =
894 archive.start_recording(&invalid_channel, STREAM_ID, SOURCE_LOCATION_LOCAL, true);
895 assert!(
896 result.is_err(),
897 "Expected error when starting recording with an invalid channel"
898 );
899 Ok(())
900 }
901
902 #[test]
903 #[serial]
904 fn test_stop_recording_on_nonexistent_channel() -> Result<(), Box<dyn Error>> {
905 let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
906 let archive_connector =
907 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
908 let archive = archive_connector
909 .poll_blocking(Duration::from_secs(30))
910 .expect("failed to connect to archive");
911
912 let nonexistent_channel = &"aeron:udp?endpoint=localhost:9999".into_c_string();
913 let result = archive.stop_recording_channel_and_stream(nonexistent_channel, STREAM_ID);
914 assert!(
915 result.is_err(),
916 "Expected error when stopping recording on a non-existent channel"
917 );
918 Ok(())
919 }
920
921 #[test]
922 #[serial]
923 fn test_replay_with_invalid_recording_id() -> Result<(), Box<dyn Error>> {
924 let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
925 let archive_connector =
926 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
927 let archive = archive_connector
928 .poll_blocking(Duration::from_secs(30))
929 .expect("failed to connect to archive");
930
931 let invalid_recording_id = -999;
932 let params = AeronArchiveReplayParams::new(0, i32::MAX, 0, 100, 0, 0)?;
933 let result = archive.start_replay(
934 invalid_recording_id,
935 &"aeron:udp?endpoint=localhost:8888".into_c_string(),
936 STREAM_ID,
937 ¶ms,
938 );
939 assert!(
940 result.is_err(),
941 "Expected error when starting replay with an invalid recording id"
942 );
943 Ok(())
944 }
945
946 #[test]
947 #[serial]
948 fn test_archive_reconnect_after_close() -> Result<(), Box<dyn std::error::Error>> {
949 let (aeron, archive_context, media_driver) = start_aeron_archive()?;
950 let archive_connector =
951 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
952 let archive = archive_connector
953 .poll_blocking(Duration::from_secs(30))
954 .expect("failed to connect to archive");
955
956 drop(archive);
957
958 let archive_connector = AeronArchiveAsyncConnect::new_with_aeron(&archive_context, &aeron)?;
959 let new_archive = archive_connector
960 .poll_blocking(Duration::from_secs(30))
961 .expect("failed to reconnect to archive");
962 assert!(
963 new_archive.get_archive_id() > 0,
964 "Reconnected archive should have a valid archive id"
965 );
966
967 drop(media_driver);
968 Ok(())
969 }
970}