1#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9pub mod bindings {
19 include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
20}
21
22use bindings::*;
23use std::cell::Cell;
24use std::os::raw::c_int;
25use std::time::{Duration, Instant};
26
27pub mod testing;
28
29include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
30include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
31
32pub type SourceLocation = bindings::aeron_archive_source_location_t;
33pub const SOURCE_LOCATION_LOCAL: aeron_archive_source_location_en =
34 SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_LOCAL;
35pub const SOURCE_LOCATION_REMOTE: aeron_archive_source_location_en =
36 SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_REMOTE;
37
38pub struct NoOpAeronIdleStrategyFunc;
39
40impl AeronIdleStrategyFuncCallback for NoOpAeronIdleStrategyFunc {
41 fn handle_aeron_idle_strategy_func(&mut self, _work_count: c_int) -> () {}
42}
43
44pub struct RecordingPos;
45impl RecordingPos {
46 pub fn find_counter_id_by_session(
47 counter_reader: &AeronCountersReader,
48 session_id: i32,
49 ) -> i32 {
50 unsafe {
51 aeron_archive_recording_pos_find_counter_id_by_session_id(
52 counter_reader.get_inner(),
53 session_id,
54 )
55 }
56 }
57 pub fn find_counter_id_by_recording_id(
58 counter_reader: &AeronCountersReader,
59 recording_id: i64,
60 ) -> i32 {
61 unsafe {
62 aeron_archive_recording_pos_find_counter_id_by_recording_id(
63 counter_reader.get_inner(),
64 recording_id,
65 )
66 }
67 }
68
69 pub fn get_recording_id_block(
72 counters_reader: &AeronCountersReader,
73 counter_id: i32,
74 wait: Duration,
75 ) -> Result<i64, AeronCError> {
76 let mut result = Self::get_recording_id(counters_reader, counter_id);
77 let instant = Instant::now();
78
79 while result.is_err() && instant.elapsed() < wait {
80 result = Self::get_recording_id(counters_reader, counter_id);
81 #[cfg(debug_assertions)]
82 std::thread::sleep(Duration::from_millis(10));
83 }
84
85 return result;
86 }
87
88 pub fn get_recording_id(
91 counters_reader: &AeronCountersReader,
92 counter_id: i32,
93 ) -> Result<i64, AeronCError> {
94 pub const RECORDING_POSITION_TYPE_ID: i32 = 100;
97
98 pub const RECORD_ALLOCATED: i32 = 1;
100
101 pub const NULL_RECORDING_ID: i64 = -1;
103
104 if counter_id < 0 {
105 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
106 }
107
108 let state = counters_reader.counter_state(counter_id)?;
109 if state != RECORD_ALLOCATED {
110 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
111 }
112
113 let type_id = counters_reader.counter_type_id(counter_id)?;
114 if type_id != RECORDING_POSITION_TYPE_ID {
115 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
116 }
117
118 let recording_id = Cell::new(-1);
124 counters_reader.foreach_counter_once(|value, id, type_id, key, label| {
125 if id == counter_id && type_id == RECORDING_POSITION_TYPE_ID {
126 let mut val = [0u8; 8];
127 val.copy_from_slice(&key[0..8]);
128 let Ok(value) = i64::from_le_bytes(val).try_into();
129 recording_id.set(value);
130 }
131 });
132 let recording_id = recording_id.get();
133 if recording_id < 0 {
134 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
135 }
136
137 Ok(recording_id)
138 }
139}
140
141unsafe extern "C" fn default_encoded_credentials(
142 _clientd: *mut std::os::raw::c_void,
143) -> *mut aeron_archive_encoded_credentials_t {
144 let empty_credentials = Box::new(aeron_archive_encoded_credentials_t {
146 data: std::ptr::null(),
147 length: 0,
148 });
149 Box::into_raw(empty_credentials)
150}
151
152impl AeronArchive {
153 pub fn aeron(&self) -> Aeron {
154 self.get_archive_context().get_aeron()
155 }
156}
157
158impl AeronArchiveAsyncConnect {
159 #[inline]
160 pub fn new_with_aeron(ctx: &AeronArchiveContext, aeron: &Aeron) -> Result<Self, AeronCError> {
162 let resource_async = Self::new(ctx)?;
163 resource_async.inner.add_dependency(aeron.clone());
164 Ok(resource_async)
165 }
166}
167
168macro_rules! impl_archive_position_methods {
169 ($pub_type:ty) => {
170 impl $pub_type {
171 pub fn get_archive_position(&self) -> Result<i64, AeronCError> {
174 if let Some(aeron) = self.inner.get_dependency::<Aeron>() {
175 let counter_reader = &aeron.counters_reader();
176 self.get_archive_position_with(counter_reader)
177 } else {
178 Err(AeronCError::from_code(-1))
179 }
180 }
181
182 pub fn get_archive_position_with(
185 &self,
186 counters: &AeronCountersReader,
187 ) -> Result<i64, AeronCError> {
188 let session_id = self.get_constants()?.session_id();
189 let counter_id = RecordingPos::find_counter_id_by_session(counters, session_id);
190 if counter_id < 0 {
191 return Err(AeronCError::from_code(counter_id));
192 }
193 let position = counters.get_counter_value(counter_id);
194 if position < 0 {
195 return Err(AeronCError::from_code(position as i32));
196 }
197 Ok(position)
198 }
199
200 pub fn is_archive_position_with(&self, length_inclusive: usize) -> bool {
203 let archive_position = self.get_archive_position().unwrap_or(-1);
204 if archive_position < 0 {
205 return false;
206 }
207 self.position() - archive_position <= length_inclusive as i64
208 }
209 }
210 };
211}
212
213impl_archive_position_methods!(AeronPublication);
214impl_archive_position_methods!(AeronExclusivePublication);
215
216impl AeronArchiveContext {
217 pub fn set_no_credentials_supplier(&self) -> Result<i32, AeronCError> {
221 self.set_credentials_supplier(
222 Some(default_encoded_credentials),
223 None,
224 None::<&Handler<AeronArchiveCredentialsFreeFuncLogger>>,
225 )
226 }
227
228 pub fn new_with_no_credentials_supplier(
232 aeron: &Aeron,
233 request_control_channel: &str,
234 response_control_channel: &str,
235 recording_events_channel: &str,
236 ) -> Result<AeronArchiveContext, AeronCError> {
237 let context = Self::new()?;
238 context.set_no_credentials_supplier()?;
239 context.set_aeron(aeron)?;
240 context.set_control_request_channel(&request_control_channel.into_c_string())?;
241 context.set_control_response_channel(&response_control_channel.into_c_string())?;
242 context.set_recording_events_channel(&recording_events_channel.into_c_string())?;
243 context.set_idle_strategy(Some(&Handler::leak(NoOpAeronIdleStrategyFunc)))?;
245 Ok(context)
246 }
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252 use log::{error, info};
253
254 use crate::testing::EmbeddedArchiveMediaDriverProcess;
255 use serial_test::serial;
256 use std::cell::Cell;
257 use std::error;
258 use std::error::Error;
259 use std::str::FromStr;
260 use std::sync::atomic::{AtomicBool, Ordering};
261 use std::sync::Arc;
262 use std::thread::{sleep, JoinHandle};
263 use std::time::{Duration, Instant};
264
265 #[derive(Default, Debug)]
266 struct ErrorCount {
267 error_count: usize,
268 }
269
270 impl AeronErrorHandlerCallback for ErrorCount {
271 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
272 error!("Aeron error {}: {}", error_code, msg);
273 self.error_count += 1;
274 }
275 }
276
277 pub const ARCHIVE_CONTROL_REQUEST: &str = "aeron:udp?endpoint=localhost:8010";
278 pub const ARCHIVE_CONTROL_RESPONSE: &str = "aeron:udp?endpoint=localhost:8011";
279 pub const ARCHIVE_RECORDING_EVENTS: &str =
280 "aeron:udp?control-mode=dynamic|control=localhost:8012";
281
282 #[test]
283 fn test_uri_string_builder() -> Result<(), AeronCError> {
284 let builder = AeronUriStringBuilder::default();
285
286 builder.init_new()?;
287 builder
288 .media(Media::Udp)? .mtu_length(1024 * 64)?
290 .set_initial_position(127424949617280, 1182294755, 65536)?;
291 let uri = builder.build(1024)?;
292 assert_eq!("aeron:udp?term-id=-1168322114|term-length=65536|mtu=65536|init-term-id=1182294755|term-offset=33408", uri);
293
294 builder.init_new()?;
295 let uri = builder
296 .media(Media::Udp)?
297 .control_mode(ControlMode::Dynamic)?
298 .reliable(false)?
299 .ttl(2)?
300 .endpoint("localhost:1235")?
301 .control("localhost:1234")?
302 .build(1024)?;
303 assert_eq!("aeron:udp?ttl=2|control-mode=dynamic|endpoint=localhost:1235|control=localhost:1234|reliable=false", uri);
304
305 let uri = AeronUriStringBuilder::from_str("aeron:udp?endpoint=localhost:8010")?
306 .ttl(5)?
307 .build(1024)?;
308
309 assert_eq!("aeron:udp?ttl=5|endpoint=localhost:8010", uri);
310
311 let uri = uri.parse::<AeronUriStringBuilder>()?.ttl(6)?.build(1024)?;
312
313 assert_eq!("aeron:udp?ttl=6|endpoint=localhost:8010", uri);
314
315 Ok(())
316 }
317
318 pub const STREAM_ID: i32 = 1033;
319 pub const MESSAGE_PREFIX: &str = "Message-Prefix-";
320 pub const CONTROL_ENDPOINT: &str = "localhost:23265";
321 pub const RECORDING_ENDPOINT: &str = "localhost:23266";
322 pub const LIVE_ENDPOINT: &str = "localhost:23267";
323 pub const REPLAY_ENDPOINT: &str = "localhost:0";
324 #[test]
327 #[serial]
328 fn test_simple_replay_merge() -> Result<(), AeronCError> {
329 let _ = env_logger::Builder::new()
330 .is_test(true)
331 .filter_level(log::LevelFilter::Info)
332 .try_init();
333
334 EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
335 .expect("failed to kill all java processes");
336
337 assert!(is_udp_port_available(23265));
338 assert!(is_udp_port_available(23266));
339 assert!(is_udp_port_available(23267));
340 assert!(is_udp_port_available(23268));
341 let id = Aeron::nano_clock();
342 let aeron_dir = format!("target/aeron/{}/shm", id);
343 let archive_dir = format!("target/aeron/{}/archive", id);
344
345 info!("starting archive media driver");
346 let media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
347 &aeron_dir,
348 &format!("{}/archive", aeron_dir),
349 ARCHIVE_CONTROL_REQUEST,
350 ARCHIVE_CONTROL_RESPONSE,
351 ARCHIVE_RECORDING_EVENTS,
352 )
353 .expect("Failed to start embedded media driver");
354
355 info!("connecting to archive");
356 let (archive, aeron) = media_driver
357 .archive_connect()
358 .expect("Could not connect to archive client");
359
360 let running = Arc::new(AtomicBool::new(true));
361
362 info!("connected to archive, adding publication");
363 assert!(!aeron.is_closed());
364
365 let (session_id, publisher_thread) =
366 reply_merge_publisher(&archive, aeron.clone(), running.clone())?;
367
368 {
369 let context = AeronContext::new()?;
370 context.set_dir(&media_driver.aeron_dir)?;
371 let error_handler = Handler::leak(ErrorCount::default());
372 context.set_error_handler(Some(&error_handler))?;
373 let aeron = Aeron::new(&context)?;
374 aeron.start()?;
375 let aeron_archive_context = archive.get_archive_context();
376 let aeron_archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
377 &aeron,
378 aeron_archive_context.get_control_request_channel(),
379 aeron_archive_context.get_control_response_channel(),
380 aeron_archive_context.get_recording_events_channel(),
381 )?;
382 aeron_archive_context.set_error_handler(Some(&error_handler))?;
383 let archive = AeronArchiveAsyncConnect::new_with_aeron(&aeron_archive_context, &aeron)?
384 .poll_blocking(Duration::from_secs(30))
385 .expect("failed to connect to archive");
386 replay_merge_subscription(&archive, aeron.clone(), session_id)?;
387 }
388
389 running.store(false, Ordering::Release);
390 publisher_thread.join().unwrap();
391
392 Ok(())
393 }
394
395 fn reply_merge_publisher(
396 archive: &AeronArchive,
397 aeron: Aeron,
398 running: Arc<AtomicBool>,
399 ) -> Result<(i32, JoinHandle<()>), AeronCError> {
400 let publication = aeron.add_publication(
401 &format!("aeron:udp?control={CONTROL_ENDPOINT}|control-mode=dynamic|term-length=65536")
403 .into_c_string(),
404 STREAM_ID,
405 Duration::from_secs(5),
406 )?;
407
408 info!(
409 "publication {} [status={:?}]",
410 publication.channel(),
411 publication.channel_status()
412 );
413 assert_eq!(1, publication.channel_status());
414
415 let session_id = publication.session_id();
416 let recording_channel = format!(
417 "aeron:udp?endpoint={RECORDING_ENDPOINT}|control={CONTROL_ENDPOINT}|session-id={session_id}"
419 );
420 info!("recording channel {}", recording_channel);
421 archive.start_recording(
422 &recording_channel.into_c_string(),
423 STREAM_ID,
424 SOURCE_LOCATION_REMOTE,
425 true,
426 )?;
427
428 info!("waiting for publisher to be connected");
429 while !publication.is_connected() {
430 thread::sleep(Duration::from_millis(100));
431 }
432 info!("publisher to be connected");
433 let counters_reader = aeron.counters_reader();
434 let mut caught_up_count = 0;
435 let publisher_thread = thread::spawn(move || {
436 let mut message_count = 0;
437
438 while running.load(Ordering::Acquire) {
439 let message = format!("{}{}", MESSAGE_PREFIX, message_count);
440 while publication.offer(
441 message.as_bytes(),
442 Handlers::no_reserved_value_supplier_handler(),
443 ) <= 0
444 {
445 thread::sleep(Duration::from_millis(10));
446 }
447 message_count += 1;
448 if message_count % 10_000 == 0 {
449 info!(
450 "Published {} messages [position={}]",
451 message_count,
452 publication.position()
453 );
454 }
455 if message_count > 10_000 {
457 while !publication.is_archive_position_with(0) {
459 thread::sleep(Duration::from_micros(300));
460 }
461 caught_up_count += 1;
462 }
463 }
464 assert!(caught_up_count > 0);
465 info!("Publisher thread terminated");
466 });
467 Ok((session_id, publisher_thread))
468 }
469
470 fn replay_merge_subscription(
471 archive: &AeronArchive,
472 aeron: Aeron,
473 session_id: i32,
474 ) -> Result<(), AeronCError> {
475 let replay_channel = format!("aeron:udp?session-id={session_id}").into_c_string();
477 info!("replay channel {:?}", replay_channel);
478
479 let replay_destination = format!("aeron:udp?endpoint={REPLAY_ENDPOINT}").into_c_string();
480 info!("replay destination {:?}", replay_destination);
481
482 let live_destination =
483 format!("aeron:udp?endpoint={LIVE_ENDPOINT}|control={CONTROL_ENDPOINT}")
484 .into_c_string();
485 info!("live destination {:?}", live_destination);
486
487 let counters_reader = aeron.counters_reader();
488 let mut counter_id = -1;
489
490 while counter_id < 0 {
491 counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
492 }
493 info!(
494 "counter id {} {:?}",
495 counter_id,
496 counters_reader.get_counter_label(counter_id, 1024)
497 );
498 info!(
499 "counter id {} position={:?}",
500 counter_id,
501 counters_reader.get_counter_value(counter_id)
502 );
503
504 let start_position = 0;
526 let recording_id = RecordingPos::get_recording_id_block(
527 &aeron.counters_reader(),
528 counter_id,
529 Duration::from_secs(5),
530 )?;
531
532 let subscribe_channel =
533 format!("aeron:udp?control-mode=manual|session-id={session_id}").into_c_string();
534 info!("subscribe channel {:?}", subscribe_channel);
535 let subscription = aeron.add_subscription(
536 &subscribe_channel,
537 STREAM_ID,
538 Handlers::no_available_image_handler(),
539 Handlers::no_unavailable_image_handler(),
540 Duration::from_secs(5),
541 )?;
542
543 let replay_merge = AeronArchiveReplayMerge::new(
544 &subscription,
545 &archive,
546 &replay_channel,
547 &replay_destination,
548 &live_destination,
549 recording_id,
550 start_position,
551 Aeron::epoch_clock(),
552 10_000,
553 )?;
554
555 info!(
556 "ReplayMerge initialization: recordingId={}, startPosition={}, subscriptionChannel={:?}, replayChannel={:?}, replayDestination={:?}, liveDestination={:?}",
557 recording_id,
558 start_position,
559 subscribe_channel,
560 &replay_channel,
561 &replay_destination,
562 &live_destination
563 );
564
565 info!(
576 "about to start_replay [maxRecordPosition={:?}]",
577 archive.get_max_recorded_position(recording_id)
578 );
579
580 let mut reply_count = 0;
581 while !replay_merge.is_merged() {
582 assert!(!replay_merge.has_failed());
583 if replay_merge.poll_once(
584 |buffer, _header| {
585 reply_count += 1;
586 if reply_count % 10_000 == 0 {
587 info!(
588 "replay-merge [count={}, isMerged={}, isLive={}]",
589 reply_count,
590 replay_merge.is_merged(),
591 replay_merge.is_live_added()
592 );
593 }
594 },
595 100,
596 )? == 0
597 {
598 let err = archive.poll_for_error_response_as_string(4096)?;
599 if !err.is_empty() {
600 panic!("{}", err);
601 }
602 if Aeron::errmsg().len() > 0 && "no error" != Aeron::errmsg() {
603 panic!("{}", Aeron::errmsg());
604 }
605 archive.poll_for_recording_signals()?;
606 thread::sleep(Duration::from_millis(100));
607 }
608 }
609 assert!(!replay_merge.has_failed());
610 assert!(replay_merge.is_live_added());
611 assert!(reply_count > 10_000);
612 Ok(())
613 }
614
615 #[test]
616 fn version_check() {
617 let major = unsafe { crate::aeron_version_major() };
618 let minor = unsafe { crate::aeron_version_minor() };
619 let patch = unsafe { crate::aeron_version_patch() };
620
621 let aeron_version = format!("{}.{}.{}", major, minor, patch);
622
623 let cargo_version = "1.48.6";
624 assert_eq!(aeron_version, cargo_version);
625 }
626
627 use std::thread;
628
629 pub fn start_aeron_archive() -> Result<
630 (
631 Aeron,
632 AeronArchiveContext,
633 EmbeddedArchiveMediaDriverProcess,
634 ),
635 Box<dyn Error>,
636 > {
637 let id = Aeron::nano_clock();
638 let aeron_dir = format!("target/aeron/{}/shm", id);
639 let archive_dir = format!("target/aeron/{}/archive", id);
640
641 let request_port = find_unused_udp_port(8000).expect("Could not find port");
642 let response_port = find_unused_udp_port(request_port + 1).expect("Could not find port");
643 let recording_event_port =
644 find_unused_udp_port(response_port + 1).expect("Could not find port");
645 let request_control_channel = &format!("aeron:udp?endpoint=localhost:{}", request_port);
646 let response_control_channel = &format!("aeron:udp?endpoint=localhost:{}", response_port);
647 let recording_events_channel =
648 &format!("aeron:udp?endpoint=localhost:{}", recording_event_port);
649 assert_ne!(request_control_channel, response_control_channel);
650
651 let archive_media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
652 &aeron_dir,
653 &archive_dir,
654 request_control_channel,
655 response_control_channel,
656 recording_events_channel,
657 )
658 .expect("Failed to start Java process");
659
660 let aeron_context = AeronContext::new()?;
661 aeron_context.set_dir(&aeron_dir.into_c_string())?;
662 aeron_context.set_client_name(&"test".into_c_string())?;
663 aeron_context.set_publication_error_frame_handler(Some(&Handler::leak(
664 AeronPublicationErrorFrameHandlerLogger,
665 )))?;
666 let error_handler = Handler::leak(ErrorCount::default());
667 aeron_context.set_error_handler(Some(&error_handler))?;
668 let aeron = Aeron::new(&aeron_context)?;
669 aeron.start()?;
670
671 let archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
672 &aeron,
673 request_control_channel,
674 response_control_channel,
675 recording_events_channel,
676 )?;
677 archive_context.set_error_handler(Some(&error_handler))?;
678 Ok((aeron, archive_context, archive_media_driver))
679 }
680
681 #[test]
682 #[serial]
683 pub fn test_aeron_archive() -> Result<(), Box<dyn error::Error>> {
684 let _ = env_logger::Builder::new()
685 .is_test(true)
686 .filter_level(log::LevelFilter::Info)
687 .try_init();
688 EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
689 .expect("failed to kill all java processes");
690
691 let (aeron, archive_context, media_driver) = start_aeron_archive()?;
692
693 assert!(!aeron.is_closed());
694
695 info!("connected to aeron");
696
697 let archive_connector =
698 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
699 let archive = archive_connector
700 .poll_blocking(Duration::from_secs(30))
701 .expect("failed to connect to aeron archive media driver");
702
703 assert!(archive.get_archive_id() > 0);
704
705 let channel = AERON_IPC_STREAM;
706 let stream_id = 10;
707
708 let subscription_id =
709 archive.start_recording(channel, stream_id, SOURCE_LOCATION_LOCAL, true)?;
710
711 assert!(subscription_id >= 0);
712 info!("subscription id {}", subscription_id);
713
714 let publication = aeron
715 .async_add_exclusive_publication(channel, stream_id)?
716 .poll_blocking(Duration::from_secs(5))?;
717
718 for i in 0..11 {
719 while publication.offer(
720 "123456".as_bytes(),
721 Handlers::no_reserved_value_supplier_handler(),
722 ) <= 0
723 {
724 sleep(Duration::from_millis(50));
725 archive.poll_for_recording_signals()?;
726 let err = archive.poll_for_error_response_as_string(4096)?;
727 if !err.is_empty() {
728 panic!("{}", err);
729 }
730 archive.idle();
731 }
732 info!("sent message {i} [test_aeron_archive]");
733 }
734
735 archive.idle();
736 let session_id = publication.get_constants()?.session_id;
737 info!("publication session id {}", session_id);
738 let stop_position = publication.position();
740 info!(
741 "publication stop position {} [publication={:?}]",
742 stop_position,
743 publication.get_constants()
744 );
745 let counters_reader = aeron.counters_reader();
746 info!("counters reader ready {:?}", counters_reader);
747
748 let mut counter_id = -1;
749
750 let start = Instant::now();
751 while counter_id <= 0 && start.elapsed() < Duration::from_secs(5) {
752 counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
753 info!("counter id {}", counter_id);
754 }
755
756 assert!(counter_id >= 0);
757
758 info!("counter id {counter_id}, session id {session_id}");
759 while counters_reader.get_counter_value(counter_id) < stop_position {
760 info!(
761 "current archive publication stop position {}",
762 counters_reader.get_counter_value(counter_id)
763 );
764 sleep(Duration::from_millis(50));
765 }
766 info!(
767 "found archive publication stop position {}",
768 counters_reader.get_counter_value(counter_id)
769 );
770
771 archive.stop_recording_channel_and_stream(channel, stream_id)?;
772 drop(publication);
773
774 info!("list recordings");
775 let found_recording_id = Cell::new(-1);
776 let start_pos = Cell::new(-1);
777 let end_pos = Cell::new(-1);
778 let start = Instant::now();
779 while start.elapsed() < Duration::from_secs(5) && found_recording_id.get() == -1 {
780 let mut count = 0;
781 archive.list_recordings_for_uri_once(
782 &mut count,
783 0,
784 i32::MAX,
785 channel,
786 stream_id,
787 |d: AeronArchiveRecordingDescriptor| {
788 assert_eq!(d.stream_id, stream_id);
789 info!("found recording {:#?}", d);
790 info!(
791 "strippedChannel={}, originalChannel={}",
792 d.stripped_channel(),
793 d.original_channel()
794 );
795 if d.stop_position > d.start_position && d.stop_position > 0 {
796 found_recording_id.set(d.recording_id);
797 start_pos.set(d.start_position);
798 end_pos.set(d.stop_position);
799 }
800
801 let copy = d.clone_struct();
803 assert_eq!(copy.deref(), d.deref());
804 assert_eq!(copy.recording_id, d.recording_id);
805 assert_eq!(copy.control_session_id, d.control_session_id);
806 assert_eq!(copy.mtu_length, d.mtu_length);
807 assert_eq!(copy.source_identity_length, d.source_identity_length);
808 },
809 )?;
810 archive.poll_for_recording_signals()?;
811 let err = archive.poll_for_error_response_as_string(4096)?;
812 if !err.is_empty() {
813 panic!("{}", err);
814 }
815 }
816 assert!(start.elapsed() < Duration::from_secs(5));
817 info!("start replay");
818 let params = AeronArchiveReplayParams::new(
819 0,
820 i32::MAX,
821 start_pos.get(),
822 end_pos.get() - start_pos.get(),
823 0,
824 0,
825 )?;
826 info!("replay params {:#?}", params);
827 let replay_stream_id = 45;
828 let replay_session_id =
829 archive.start_replay(found_recording_id.get(), channel, replay_stream_id, ¶ms)?;
830 let session_id = replay_session_id as i32;
831
832 info!("replay session id {}", replay_session_id);
833 info!("session id {}", session_id);
834 let channel_replay =
835 format!("{}?session-id={}", channel.to_str().unwrap(), session_id).into_c_string();
836 info!("archive id: {}", archive.get_archive_id());
837
838 info!("add subscription {:?}", channel_replay);
839 let subscription = aeron
840 .async_add_subscription(
841 &channel_replay,
842 replay_stream_id,
843 Some(&Handler::leak(AeronAvailableImageLogger)),
844 Some(&Handler::leak(AeronUnavailableImageLogger)),
845 )?
846 .poll_blocking(Duration::from_secs(10))?;
847
848 #[derive(Default)]
849 struct FragmentHandler {
850 count: Cell<usize>,
851 }
852
853 impl AeronFragmentHandlerCallback for FragmentHandler {
854 fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], _header: AeronHeader) {
855 assert_eq!(buffer, "123456".as_bytes());
856
857 self.count.set(self.count.get() + 1);
859 }
860 }
861
862 let poll = Handler::leak(FragmentHandler::default());
863
864 let start = Instant::now();
865 while start.elapsed() < Duration::from_secs(10) && subscription.poll(Some(&poll), 100)? <= 0
866 {
867 let err = archive.poll_for_error_response_as_string(4096)?;
868 if !err.is_empty() {
869 panic!("{}", err);
870 }
871 }
872 assert!(
873 start.elapsed() < Duration::from_secs(10),
874 "messages not received {:?}",
875 poll.count
876 );
877 info!("aeron {:?}", aeron);
878 info!("ctx {:?}", archive_context);
879 assert_eq!(11, poll.count.get());
880 Ok(())
881 }
882
883 #[test]
884 #[serial]
885 fn test_invalid_recording_channel() -> Result<(), Box<dyn Error>> {
886 let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
887 let archive_connector =
888 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
889 let archive = archive_connector
890 .poll_blocking(Duration::from_secs(30))
891 .expect("failed to connect to archive");
892
893 let invalid_channel = "invalid:channel".into_c_string();
894 let result =
895 archive.start_recording(&invalid_channel, STREAM_ID, SOURCE_LOCATION_LOCAL, true);
896 assert!(
897 result.is_err(),
898 "Expected error when starting recording with an invalid channel"
899 );
900 Ok(())
901 }
902
903 #[test]
904 #[serial]
905 fn test_stop_recording_on_nonexistent_channel() -> Result<(), Box<dyn Error>> {
906 let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
907 let archive_connector =
908 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
909 let archive = archive_connector
910 .poll_blocking(Duration::from_secs(30))
911 .expect("failed to connect to archive");
912
913 let nonexistent_channel = &"aeron:udp?endpoint=localhost:9999".into_c_string();
914 let result = archive.stop_recording_channel_and_stream(nonexistent_channel, STREAM_ID);
915 assert!(
916 result.is_err(),
917 "Expected error when stopping recording on a non-existent channel"
918 );
919 Ok(())
920 }
921
922 #[test]
923 #[serial]
924 fn test_replay_with_invalid_recording_id() -> Result<(), Box<dyn Error>> {
925 let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
926 let archive_connector =
927 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
928 let archive = archive_connector
929 .poll_blocking(Duration::from_secs(30))
930 .expect("failed to connect to archive");
931
932 let invalid_recording_id = -999;
933 let params = AeronArchiveReplayParams::new(0, i32::MAX, 0, 100, 0, 0)?;
934 let result = archive.start_replay(
935 invalid_recording_id,
936 &"aeron:udp?endpoint=localhost:8888".into_c_string(),
937 STREAM_ID,
938 ¶ms,
939 );
940 assert!(
941 result.is_err(),
942 "Expected error when starting replay with an invalid recording id"
943 );
944 Ok(())
945 }
946
947 #[test]
948 #[serial]
949 fn test_archive_reconnect_after_close() -> Result<(), Box<dyn std::error::Error>> {
950 let (aeron, archive_context, media_driver) = start_aeron_archive()?;
951 let archive_connector =
952 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
953 let archive = archive_connector
954 .poll_blocking(Duration::from_secs(30))
955 .expect("failed to connect to archive");
956
957 drop(archive);
958
959 let archive_connector = AeronArchiveAsyncConnect::new_with_aeron(&archive_context, &aeron)?;
960 let new_archive = archive_connector
961 .poll_blocking(Duration::from_secs(30))
962 .expect("failed to reconnect to archive");
963 assert!(
964 new_archive.get_archive_id() > 0,
965 "Reconnected archive should have a valid archive id"
966 );
967
968 drop(media_driver);
969 Ok(())
970 }
971}