1#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9#[allow(improper_ctypes_definitions)]
19#[allow(unpredictable_function_pointer_comparisons)]
20pub mod bindings {
21 include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
22}
23
24use bindings::*;
25use std::cell::Cell;
26use std::os::raw::c_int;
27use std::time::{Duration, Instant};
28
29pub mod testing;
30
31include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
32include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
33
34pub type SourceLocation = bindings::aeron_archive_source_location_t;
35pub const SOURCE_LOCATION_LOCAL: aeron_archive_source_location_en =
36 SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_LOCAL;
37pub const SOURCE_LOCATION_REMOTE: aeron_archive_source_location_en =
38 SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_REMOTE;
39
40unsafe extern "C" fn no_op_idle_strategy_func(
41 _state: *mut std::os::raw::c_void,
42 _work_count: c_int,
43) {
44}
45
46pub struct RecordingPos;
47impl RecordingPos {
48 pub fn find_counter_id_by_session(
49 counter_reader: &AeronCountersReader,
50 session_id: i32,
51 ) -> i32 {
52 unsafe {
53 aeron_archive_recording_pos_find_counter_id_by_session_id(
54 counter_reader.get_inner(),
55 session_id,
56 )
57 }
58 }
59 pub fn find_counter_id_by_recording_id(
60 counter_reader: &AeronCountersReader,
61 recording_id: i64,
62 ) -> i32 {
63 unsafe {
64 aeron_archive_recording_pos_find_counter_id_by_recording_id(
65 counter_reader.get_inner(),
66 recording_id,
67 )
68 }
69 }
70
71 pub fn get_recording_id_block(
74 counters_reader: &AeronCountersReader,
75 counter_id: i32,
76 wait: Duration,
77 ) -> Result<i64, AeronCError> {
78 let mut result = Self::get_recording_id(counters_reader, counter_id);
79 let instant = Instant::now();
80
81 while result.is_err() && instant.elapsed() < wait {
82 result = Self::get_recording_id(counters_reader, counter_id);
83 #[cfg(debug_assertions)]
84 std::thread::sleep(Duration::from_millis(10));
85 }
86
87 return result;
88 }
89
90 pub fn get_recording_id(
93 counters_reader: &AeronCountersReader,
94 counter_id: i32,
95 ) -> Result<i64, AeronCError> {
96 pub const RECORDING_POSITION_TYPE_ID: i32 = 100;
99
100 pub const RECORD_ALLOCATED: i32 = 1;
102
103 pub const NULL_RECORDING_ID: i64 = -1;
105
106 if counter_id < 0 {
107 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
108 }
109
110 let state = counters_reader.counter_state(counter_id)?;
111 if state != RECORD_ALLOCATED {
112 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
113 }
114
115 let type_id = counters_reader.counter_type_id(counter_id)?;
116 if type_id != RECORDING_POSITION_TYPE_ID {
117 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
118 }
119
120 let recording_id = Cell::new(-1);
126 counters_reader.foreach_counter_once(|value, id, type_id, key, label| {
127 if id == counter_id && type_id == RECORDING_POSITION_TYPE_ID {
128 let mut val = [0u8; 8];
129 val.copy_from_slice(&key[0..8]);
130 let Ok(value) = i64::from_le_bytes(val).try_into();
131 recording_id.set(value);
132 }
133 });
134 let recording_id = recording_id.get();
135 if recording_id < 0 {
136 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
137 }
138
139 Ok(recording_id)
140 }
141}
142
143unsafe extern "C" fn default_encoded_credentials(
144 _clientd: *mut std::os::raw::c_void,
145) -> *mut aeron_archive_encoded_credentials_t {
146 let empty_credentials = Box::new(aeron_archive_encoded_credentials_t {
148 data: std::ptr::null(),
149 length: 0,
150 });
151 Box::into_raw(empty_credentials)
152}
153
154unsafe extern "C" fn default_credentials_free(
155 credentials: *mut aeron_archive_encoded_credentials_t,
156 _clientd: *mut std::os::raw::c_void,
157) {
158 if !credentials.is_null() {
159 let _ = Box::from_raw(credentials);
160 }
161}
162
163impl AeronArchive {
164 pub fn aeron(&self) -> Aeron {
165 self.get_archive_context().get_aeron()
166 }
167}
168
169impl AeronArchiveAsyncConnect {
170 #[inline]
171 pub fn new_with_aeron(ctx: &AeronArchiveContext, aeron: &Aeron) -> Result<Self, AeronCError> {
173 let resource_async = Self::new(ctx)?;
174 resource_async.inner.add_dependency(aeron.clone());
175 Ok(resource_async)
176 }
177}
178
179macro_rules! impl_archive_position_methods {
180 ($pub_type:ty) => {
181 impl $pub_type {
182 pub fn get_archive_position(&self) -> Result<i64, AeronCError> {
185 if let Some(aeron) = self.inner.get_dependency::<Aeron>() {
186 let counter_reader = &aeron.counters_reader();
187 self.get_archive_position_with(counter_reader)
188 } else {
189 Err(AeronCError::from_code(-1))
190 }
191 }
192
193 pub fn get_archive_position_with(
196 &self,
197 counters: &AeronCountersReader,
198 ) -> Result<i64, AeronCError> {
199 let session_id = self.get_constants()?.session_id();
200 let counter_id = RecordingPos::find_counter_id_by_session(counters, session_id);
201 if counter_id < 0 {
202 return Err(AeronCError::from_code(counter_id));
203 }
204 let position = counters.get_counter_value(counter_id);
205 if position < 0 {
206 return Err(AeronCError::from_code(position as i32));
207 }
208 Ok(position)
209 }
210
211 pub fn is_archive_position_with(&self, length_inclusive: usize) -> bool {
214 let archive_position = self.get_archive_position().unwrap_or(-1);
215 if archive_position < 0 {
216 return false;
217 }
218 self.position() - archive_position <= length_inclusive as i64
219 }
220 }
221 };
222}
223
224impl_archive_position_methods!(AeronPublication);
225impl_archive_position_methods!(AeronExclusivePublication);
226
227impl AeronArchiveContext {
228 pub fn set_no_credentials_supplier(&self) -> Result<i32, AeronCError> {
232 let result = unsafe {
233 bindings::aeron_archive_context_set_credentials_supplier(
234 self.get_inner(),
235 Some(default_encoded_credentials),
236 None,
237 Some(default_credentials_free),
238 std::ptr::null_mut(),
239 )
240 };
241 if result < 0 {
242 Err(AeronCError::from_code(result))
243 } else {
244 Ok(result)
245 }
246 }
247
248 pub fn new_with_no_credentials_supplier(
252 aeron: &Aeron,
253 request_control_channel: &str,
254 response_control_channel: &str,
255 recording_events_channel: &str,
256 ) -> Result<AeronArchiveContext, AeronCError> {
257 let context = Self::new()?;
258 context.set_no_credentials_supplier()?;
259 context.set_aeron(aeron)?;
260 context.set_control_request_channel(&request_control_channel.into_c_string())?;
261 context.set_control_response_channel(&response_control_channel.into_c_string())?;
262 context.set_recording_events_channel(&recording_events_channel.into_c_string())?;
263 let result = unsafe {
266 bindings::aeron_archive_context_set_idle_strategy(
267 context.get_inner(),
268 Some(no_op_idle_strategy_func),
269 std::ptr::null_mut(),
270 )
271 };
272 if result < 0 {
273 return Err(AeronCError::from_code(result));
274 }
275 Ok(context)
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282 use log::{error, info};
283
284 use crate::testing::EmbeddedArchiveMediaDriverProcess;
285 use serial_test::serial;
286 use std::cell::Cell;
287 use std::error;
288 use std::error::Error;
289 use std::str::FromStr;
290 use std::sync::atomic::{AtomicBool, Ordering};
291 use std::sync::Arc;
292 use std::thread::{sleep, JoinHandle};
293 use std::time::{Duration, Instant};
294
295 #[derive(Default, Debug)]
296 struct ErrorCount {
297 error_count: usize,
298 }
299
300 impl AeronErrorHandlerCallback for ErrorCount {
301 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
302 error!("Aeron error {}: {}", error_code, msg);
303 self.error_count += 1;
304 }
305 }
306
307 pub const ARCHIVE_CONTROL_REQUEST: &str = "aeron:udp?endpoint=localhost:8010";
308 pub const ARCHIVE_CONTROL_RESPONSE: &str = "aeron:udp?endpoint=localhost:8011";
309 pub const ARCHIVE_RECORDING_EVENTS: &str =
310 "aeron:udp?control-mode=dynamic|control=localhost:8012";
311
312 #[test]
313 fn test_uri_string_builder() -> Result<(), AeronCError> {
314 let builder = AeronUriStringBuilder::default();
315
316 builder.init_new()?;
317 builder
318 .media(Media::Udp)? .mtu_length(1024 * 64)?
320 .set_initial_position(127424949617280, 1182294755, 65536)?;
321 let uri = builder.build(1024)?;
322 assert_eq!("aeron:udp?term-id=-1168322114|term-length=65536|mtu=65536|init-term-id=1182294755|term-offset=33408", uri);
323
324 builder.init_new()?;
325 let uri = builder
326 .media(Media::Udp)?
327 .control_mode(ControlMode::Dynamic)?
328 .reliable(false)?
329 .ttl(2)?
330 .endpoint("localhost:1235")?
331 .control("localhost:1234")?
332 .build(1024)?;
333 assert_eq!("aeron:udp?ttl=2|control-mode=dynamic|endpoint=localhost:1235|control=localhost:1234|reliable=false", uri);
334
335 let uri = AeronUriStringBuilder::from_str("aeron:udp?endpoint=localhost:8010")?
336 .ttl(5)?
337 .build(1024)?;
338
339 assert_eq!("aeron:udp?ttl=5|endpoint=localhost:8010", uri);
340
341 let uri = uri.parse::<AeronUriStringBuilder>()?.ttl(6)?.build(1024)?;
342
343 assert_eq!("aeron:udp?ttl=6|endpoint=localhost:8010", uri);
344
345 Ok(())
346 }
347
348 pub const STREAM_ID: i32 = 1033;
349 pub const MESSAGE_PREFIX: &str = "Message-Prefix-";
350 pub const CONTROL_ENDPOINT: &str = "localhost:23265";
351 pub const RECORDING_ENDPOINT: &str = "localhost:23266";
352 pub const LIVE_ENDPOINT: &str = "localhost:23267";
353 pub const REPLAY_ENDPOINT: &str = "localhost:0";
354 #[test]
357 #[serial]
358 fn test_simple_replay_merge() -> Result<(), AeronCError> {
359 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
360
361 EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
362 .expect("failed to kill all java processes");
363
364 assert!(is_udp_port_available(23265));
365 assert!(is_udp_port_available(23266));
366 assert!(is_udp_port_available(23267));
367 assert!(is_udp_port_available(23268));
368 let id = Aeron::nano_clock();
369 let aeron_dir = format!("target/aeron/{}/shm", id);
370 let archive_dir = format!("target/aeron/{}/archive", id);
371
372 info!("starting archive media driver");
373 let media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
374 &aeron_dir,
375 &format!("{}/archive", aeron_dir),
376 ARCHIVE_CONTROL_REQUEST,
377 ARCHIVE_CONTROL_RESPONSE,
378 ARCHIVE_RECORDING_EVENTS,
379 )
380 .expect("Failed to start embedded media driver");
381
382 info!("connecting to archive");
383 let (archive, aeron) = media_driver
384 .archive_connect()
385 .expect("Could not connect to archive client");
386
387 let running = Arc::new(AtomicBool::new(true));
388
389 info!("connected to archive, adding publication");
390 assert!(!aeron.is_closed());
391
392 let (session_id, publisher_thread) =
393 reply_merge_publisher(&archive, aeron.clone(), running.clone())?;
394
395 {
396 let context = AeronContext::new()?;
397 context.set_dir(&media_driver.aeron_dir)?;
398 let mut error_handler = Handler::leak(ErrorCount::default());
399 context.set_error_handler(Some(&error_handler))?;
400 context.set_driver_timeout_ms(60_000)?;
401
402 let inner: Result<(), AeronCError> = (|| {
404 let aeron = Aeron::new(&context)?;
405 aeron.start()?;
406 let aeron_archive_context = archive.get_archive_context();
407 let aeron_archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
408 &aeron,
409 aeron_archive_context.get_control_request_channel(),
410 aeron_archive_context.get_control_response_channel(),
411 aeron_archive_context.get_recording_events_channel(),
412 )?;
413 aeron_archive_context.set_message_timeout_ns(60_000_000_000)?;
414 aeron_archive_context.set_error_handler(Some(&error_handler))?;
415 let merge_archive =
416 AeronArchiveAsyncConnect::new_with_aeron(&aeron_archive_context, &aeron)?
417 .poll_blocking(Duration::from_secs(60))?;
418 replay_merge_subscription(&merge_archive, aeron.clone(), session_id)?;
419 Ok(())
420 })();
421
422 error_handler.release();
423 inner?;
424 }
425
426 running.store(false, Ordering::Release);
427 publisher_thread.join().unwrap();
428 drop(archive);
429 drop(aeron);
430 drop(media_driver);
431
432 Ok(())
433 }
434
435 fn reply_merge_publisher(
436 archive: &AeronArchive,
437 aeron: Aeron,
438 running: Arc<AtomicBool>,
439 ) -> Result<(i32, JoinHandle<()>), AeronCError> {
440 let publication = aeron.add_publication(
441 &format!("aeron:udp?control={CONTROL_ENDPOINT}|control-mode=dynamic|term-length=65536")
443 .into_c_string(),
444 STREAM_ID,
445 Duration::from_secs(5),
446 )?;
447
448 info!(
449 "publication {} [status={:?}]",
450 publication.channel(),
451 publication.channel_status()
452 );
453 assert_eq!(1, publication.channel_status());
454
455 let session_id = publication.session_id();
456 let recording_channel = format!(
457 "aeron:udp?endpoint={RECORDING_ENDPOINT}|control={CONTROL_ENDPOINT}|session-id={session_id}"
459 );
460 info!("recording channel {}", recording_channel);
461 archive.start_recording(
462 &recording_channel.into_c_string(),
463 STREAM_ID,
464 SOURCE_LOCATION_REMOTE,
465 true,
466 )?;
467
468 info!("waiting for publisher to be connected");
469 while !publication.is_connected() {
470 thread::sleep(Duration::from_millis(100));
471 }
472 info!("publisher to be connected");
473 let counters_reader = aeron.counters_reader();
474 let mut caught_up_count = 0;
475 let publisher_thread = thread::spawn(move || {
476 let mut message_count = 0;
477
478 while running.load(Ordering::Acquire) {
479 let message = format!("{}{}", MESSAGE_PREFIX, message_count);
480 while publication.offer(
481 message.as_bytes(),
482 Handlers::no_reserved_value_supplier_handler(),
483 ) <= 0
484 {
485 thread::sleep(Duration::from_millis(10));
486 }
487 message_count += 1;
488 if message_count % 10_000 == 0 {
489 info!(
490 "Published {} messages [position={}]",
491 message_count,
492 publication.position()
493 );
494 }
495 if message_count > 10_000 {
497 while !publication.is_archive_position_with(0) {
499 thread::sleep(Duration::from_micros(300));
500 }
501 caught_up_count += 1;
502 }
503 }
504 assert!(caught_up_count > 0);
505 if let Err(err) = publication.close(Handlers::no_notification_handler()) {
506 info!("publisher close returned error: {err:?}");
507 }
508 info!("Publisher thread terminated");
509 });
510 Ok((session_id, publisher_thread))
511 }
512
513 fn replay_merge_subscription(
514 archive: &AeronArchive,
515 aeron: Aeron,
516 session_id: i32,
517 ) -> Result<(), AeronCError> {
518 let replay_channel = format!("aeron:udp?session-id={session_id}").into_c_string();
520 info!("replay channel {:?}", replay_channel);
521
522 let replay_destination = format!("aeron:udp?endpoint={REPLAY_ENDPOINT}").into_c_string();
523 info!("replay destination {:?}", replay_destination);
524
525 let live_destination =
526 format!("aeron:udp?endpoint={LIVE_ENDPOINT}|control={CONTROL_ENDPOINT}")
527 .into_c_string();
528 info!("live destination {:?}", live_destination);
529
530 let counters_reader = aeron.counters_reader();
531 let mut counter_id = -1;
532
533 while counter_id < 0 {
534 counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
535 }
536 info!(
537 "counter id {} {:?}",
538 counter_id,
539 counters_reader.get_counter_label(counter_id, 1024)
540 );
541 info!(
542 "counter id {} position={:?}",
543 counter_id,
544 counters_reader.get_counter_value(counter_id)
545 );
546
547 let start_position = 0;
569 let recording_id = RecordingPos::get_recording_id_block(
570 &aeron.counters_reader(),
571 counter_id,
572 Duration::from_secs(5),
573 )?;
574
575 let subscribe_channel =
576 format!("aeron:udp?control-mode=manual|session-id={session_id}").into_c_string();
577 info!("subscribe channel {:?}", subscribe_channel);
578 let subscription = aeron.add_subscription(
579 &subscribe_channel,
580 STREAM_ID,
581 Handlers::no_available_image_handler(),
582 Handlers::no_unavailable_image_handler(),
583 Duration::from_secs(5),
584 )?;
585
586 let replay_merge = AeronArchiveReplayMerge::new(
587 &subscription,
588 &archive,
589 &replay_channel,
590 &replay_destination,
591 &live_destination,
592 recording_id,
593 start_position,
594 Aeron::epoch_clock(),
595 10_000,
596 )?;
597
598 info!(
599 "ReplayMerge initialization: recordingId={}, startPosition={}, subscriptionChannel={:?}, replayChannel={:?}, replayDestination={:?}, liveDestination={:?}",
600 recording_id,
601 start_position,
602 subscribe_channel,
603 &replay_channel,
604 &replay_destination,
605 &live_destination
606 );
607
608 info!(
619 "about to start_replay [maxRecordPosition={:?}]",
620 archive.get_max_recorded_position(recording_id)
621 );
622
623 let mut reply_count = 0;
624 while !replay_merge.is_merged() {
625 assert!(!replay_merge.has_failed());
626 if replay_merge.poll_once(
627 |buffer, _header| {
628 reply_count += 1;
629 if reply_count % 10_000 == 0 {
630 info!(
631 "replay-merge [count={}, isMerged={}, isLive={}]",
632 reply_count,
633 replay_merge.is_merged(),
634 replay_merge.is_live_added()
635 );
636 }
637 },
638 100,
639 )? == 0
640 {
641 let err = archive.poll_for_error_response_as_string(4096)?;
642 if !err.is_empty() {
643 panic!("{}", err);
644 }
645 if Aeron::errmsg().len() > 0 && "no error" != Aeron::errmsg() {
646 panic!("{}", Aeron::errmsg());
647 }
648 archive.poll_for_recording_signals()?;
649 thread::sleep(Duration::from_millis(100));
650 }
651 }
652 assert!(!replay_merge.has_failed());
653 assert!(replay_merge.is_live_added());
654 assert!(reply_count > 10_000, "no replay-merge fragments received");
655 if let Err(err) = replay_merge.close() {
656 info!("replay merge close returned error: {err:?}");
657 }
658 if let Err(err) = subscription.close(Handlers::no_notification_handler()) {
659 info!("replay subscription close returned error: {err:?}");
660 }
661 Ok(())
662 }
663
664 #[test]
665 fn version_check() {
666 let major = unsafe { crate::aeron_version_major() };
667 let minor = unsafe { crate::aeron_version_minor() };
668 let patch = unsafe { crate::aeron_version_patch() };
669
670 let aeron_version = format!("{}.{}.{}", major, minor, patch);
671
672 let cargo_version = "1.50.2";
673 assert_eq!(aeron_version, cargo_version);
674 }
675
676 use std::thread;
677
678 fn start_aeron_archive() -> Result<
679 (
680 Aeron,
681 AeronArchiveContext,
682 EmbeddedArchiveMediaDriverProcess,
683 Handler<AeronPublicationErrorFrameHandlerLogger>,
684 Handler<ErrorCount>,
685 ),
686 Box<dyn Error>,
687 > {
688 let id = Aeron::nano_clock();
689 let aeron_dir = format!("target/aeron/{}/shm", id);
690 let archive_dir = format!("target/aeron/{}/archive", id);
691
692 let request_port = find_unused_udp_port(8000).expect("Could not find port");
693 let response_port = find_unused_udp_port(request_port + 1).expect("Could not find port");
694 let recording_event_port =
695 find_unused_udp_port(response_port + 1).expect("Could not find port");
696 let request_control_channel = &format!("aeron:udp?endpoint=localhost:{}", request_port);
697 let response_control_channel = &format!("aeron:udp?endpoint=localhost:{}", response_port);
698 let recording_events_channel =
699 &format!("aeron:udp?endpoint=localhost:{}", recording_event_port);
700 assert_ne!(request_control_channel, response_control_channel);
701
702 let archive_media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
703 &aeron_dir,
704 &archive_dir,
705 request_control_channel,
706 response_control_channel,
707 recording_events_channel,
708 )
709 .expect("Failed to start Java process");
710
711 let aeron_context = AeronContext::new()?;
712 aeron_context.set_dir(&aeron_dir.into_c_string())?;
713 aeron_context.set_client_name(&"test".into_c_string())?;
714 let mut pub_error_frame_handler = Handler::leak(AeronPublicationErrorFrameHandlerLogger);
715 aeron_context.set_publication_error_frame_handler(Some(&pub_error_frame_handler))?;
716 let mut error_handler = Handler::leak(ErrorCount::default());
717 aeron_context.set_error_handler(Some(&error_handler))?;
718
719 let inner: Result<(Aeron, AeronArchiveContext), Box<dyn Error>> = (|| {
721 let aeron = Aeron::new(&aeron_context)?;
722 aeron.start()?;
723 let archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
724 &aeron,
725 request_control_channel,
726 response_control_channel,
727 recording_events_channel,
728 )?;
729 archive_context.set_error_handler(Some(&error_handler))?;
730 Ok((aeron, archive_context))
731 })();
732
733 match inner {
734 Ok((aeron, archive_context)) => Ok((
735 aeron,
736 archive_context,
737 archive_media_driver,
738 pub_error_frame_handler,
739 error_handler,
740 )),
741 Err(e) => {
742 pub_error_frame_handler.release();
743 error_handler.release();
744 Err(e)
745 }
746 }
747 }
748
749 #[test]
750 #[serial]
751 pub fn test_aeron_archive() -> Result<(), Box<dyn error::Error>> {
752 rusteron_code_gen::test_logger::init(log::LevelFilter::Info);
753 EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
754 .expect("failed to kill all java processes");
755
756 let (aeron, archive_context, media_driver, mut pub_error_frame_handler, mut error_handler) =
757 start_aeron_archive()?;
758
759 let test_result: Result<(), Box<dyn error::Error>> = (|| {
760 assert!(!aeron.is_closed());
761
762 info!("connected to aeron");
763
764 let archive_connector =
765 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
766 let archive = archive_connector
767 .poll_blocking(Duration::from_secs(30))
768 .expect("failed to connect to aeron archive media driver");
769
770 assert!(archive.get_archive_id() > 0);
771
772 let channel = AERON_IPC_STREAM;
773 let stream_id = 10;
774
775 let subscription_id =
776 archive.start_recording(channel, stream_id, SOURCE_LOCATION_LOCAL, true)?;
777
778 assert!(subscription_id >= 0);
779 info!("subscription id {}", subscription_id);
780
781 let publication = aeron
782 .async_add_exclusive_publication(channel, stream_id)?
783 .poll_blocking(Duration::from_secs(5))?;
784
785 for i in 0..11 {
786 while publication.offer(
787 "123456".as_bytes(),
788 Handlers::no_reserved_value_supplier_handler(),
789 ) <= 0
790 {
791 sleep(Duration::from_millis(50));
792 archive.poll_for_recording_signals()?;
793 let err = archive.poll_for_error_response_as_string(4096)?;
794 if !err.is_empty() {
795 return Err(std::io::Error::other(err).into());
796 }
797 archive.idle();
798 }
799 info!("sent message {i} [test_aeron_archive]");
800 }
801
802 archive.idle();
803 let session_id = publication.get_constants()?.session_id;
804 info!("publication session id {}", session_id);
805 let stop_position = publication.position();
807 info!(
808 "publication stop position {} [publication={:?}]",
809 stop_position,
810 publication.get_constants()
811 );
812 let counters_reader = aeron.counters_reader();
813 info!("counters reader ready {:?}", counters_reader);
814
815 let mut counter_id = -1;
816
817 let start = Instant::now();
818 while counter_id <= 0 && start.elapsed() < Duration::from_secs(5) {
819 counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
820 info!("counter id {}", counter_id);
821 }
822
823 assert!(counter_id >= 0);
824
825 info!("counter id {counter_id}, session id {session_id}");
826 while counters_reader.get_counter_value(counter_id) < stop_position {
827 info!(
828 "current archive publication stop position {}",
829 counters_reader.get_counter_value(counter_id)
830 );
831 sleep(Duration::from_millis(50));
832 }
833 info!(
834 "found archive publication stop position {}",
835 counters_reader.get_counter_value(counter_id)
836 );
837
838 archive.stop_recording_channel_and_stream(channel, stream_id)?;
839 drop(publication);
840
841 info!("list recordings");
842 let found_recording_id = Cell::new(-1);
843 let start_pos = Cell::new(-1);
844 let end_pos = Cell::new(-1);
845 let start = Instant::now();
846 while start.elapsed() < Duration::from_secs(5) && found_recording_id.get() == -1 {
847 let mut count = 0;
848 archive.list_recordings_for_uri_once(
849 &mut count,
850 0,
851 i32::MAX,
852 channel,
853 stream_id,
854 |d: AeronArchiveRecordingDescriptor| {
855 assert_eq!(d.stream_id, stream_id);
856 info!("found recording {:#?}", d);
857 info!(
858 "strippedChannel={}, originalChannel={}",
859 d.stripped_channel(),
860 d.original_channel()
861 );
862 if d.stop_position > d.start_position && d.stop_position > 0 {
863 found_recording_id.set(d.recording_id);
864 start_pos.set(d.start_position);
865 end_pos.set(d.stop_position);
866 }
867
868 let copy = d.clone_struct();
870 assert_eq!(copy.deref(), d.deref());
871 assert_eq!(copy.recording_id, d.recording_id);
872 assert_eq!(copy.control_session_id, d.control_session_id);
873 assert_eq!(copy.mtu_length, d.mtu_length);
874 assert_eq!(copy.source_identity_length, d.source_identity_length);
875 },
876 )?;
877 archive.poll_for_recording_signals()?;
878 let err = archive.poll_for_error_response_as_string(4096)?;
879 if !err.is_empty() {
880 return Err(std::io::Error::other(err).into());
881 }
882 }
883 assert!(start.elapsed() < Duration::from_secs(5));
884 info!("start replay");
885 let params = AeronArchiveReplayParams::new(
886 0,
887 i32::MAX,
888 start_pos.get(),
889 end_pos.get() - start_pos.get(),
890 0,
891 0,
892 )?;
893 info!("replay params {:#?}", params);
894 let replay_stream_id = 45;
895 let replay_session_id = archive.start_replay(
896 found_recording_id.get(),
897 channel,
898 replay_stream_id,
899 ¶ms,
900 )?;
901 let session_id = replay_session_id as i32;
902
903 info!("replay session id {}", replay_session_id);
904 info!("session id {}", session_id);
905 let channel_replay =
906 format!("{}?session-id={}", channel.to_str().unwrap(), session_id).into_c_string();
907 info!("archive id: {}", archive.get_archive_id());
908
909 info!("add subscription {:?}", channel_replay);
910 let mut avail_image_handler = Handler::leak(AeronAvailableImageLogger);
911 let mut unavail_image_handler = Handler::leak(AeronUnavailableImageLogger);
912 let replay_result: Result<(), Box<dyn error::Error>> = (|| {
913 let subscription = aeron
914 .async_add_subscription(
915 &channel_replay,
916 replay_stream_id,
917 Some(&avail_image_handler),
918 Some(&unavail_image_handler),
919 )?
920 .poll_blocking(Duration::from_secs(10))?;
921
922 #[derive(Default)]
923 struct FragmentHandler {
924 count: Cell<usize>,
925 }
926
927 impl AeronFragmentHandlerCallback for FragmentHandler {
928 fn handle_aeron_fragment_handler(
929 &mut self,
930 buffer: &[u8],
931 _header: AeronHeader,
932 ) {
933 assert_eq!(buffer, "123456".as_bytes());
934
935 self.count.set(self.count.get() + 1);
937 }
938 }
939
940 let mut poll = Handler::leak(FragmentHandler::default());
941 let poll_result: Result<(), Box<dyn error::Error>> = (|| {
942 let wait_timeout = Duration::from_secs(30);
943 let start = Instant::now();
944 while start.elapsed() < wait_timeout
945 && subscription.poll(Some(&poll), 100)? <= 0
946 {
947 let err = archive.poll_for_error_response_as_string(4096)?;
948 if !err.is_empty() {
949 return Err(std::io::Error::other(err).into());
950 }
951 }
952
953 if start.elapsed() >= wait_timeout {
954 return Err(std::io::Error::other(format!(
955 "messages not received {:?}",
956 poll.count
957 ))
958 .into());
959 }
960
961 info!("aeron {:?}", aeron);
962 info!("ctx {:?}", archive_context);
963 if poll.count.get() != 11 {
964 return Err(std::io::Error::other(format!(
965 "expected 11 replayed messages, got {}",
966 poll.count.get()
967 ))
968 .into());
969 }
970 Ok(())
971 })();
972
973 drop(subscription);
974 poll.release();
975 poll_result
976 })();
977
978 avail_image_handler.release();
979 unavail_image_handler.release();
980 replay_result?;
981 drop(archive);
982 Ok(())
983 })();
984
985 drop(aeron);
986 drop(media_driver);
987 pub_error_frame_handler.release();
988 error_handler.release();
989 test_result
990 }
991
992 #[test]
993 #[serial]
994 fn test_invalid_recording_channel() -> Result<(), Box<dyn Error>> {
995 let (aeron, archive_context, media_driver, mut pub_error_frame_handler, mut error_handler) =
996 start_aeron_archive()?;
997 let archive_connector =
998 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
999 let archive = archive_connector
1000 .poll_blocking(Duration::from_secs(30))
1001 .expect("failed to connect to archive");
1002
1003 let invalid_channel = "invalid:channel".into_c_string();
1004 let result =
1005 archive.start_recording(&invalid_channel, STREAM_ID, SOURCE_LOCATION_LOCAL, true);
1006 assert!(
1007 result.is_err(),
1008 "Expected error when starting recording with an invalid channel"
1009 );
1010 drop(archive);
1011 drop(aeron);
1012 drop(media_driver);
1013 pub_error_frame_handler.release();
1014 error_handler.release();
1015 Ok(())
1016 }
1017
1018 #[test]
1019 #[serial]
1020 fn test_stop_recording_on_nonexistent_channel() -> Result<(), Box<dyn Error>> {
1021 let (aeron, archive_context, media_driver, mut pub_error_frame_handler, mut error_handler) =
1022 start_aeron_archive()?;
1023 let archive_connector =
1024 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
1025 let archive = archive_connector
1026 .poll_blocking(Duration::from_secs(30))
1027 .expect("failed to connect to archive");
1028
1029 let nonexistent_channel = &"aeron:udp?endpoint=localhost:9999".into_c_string();
1030 let result = archive.stop_recording_channel_and_stream(nonexistent_channel, STREAM_ID);
1031 assert!(
1032 result.is_err(),
1033 "Expected error when stopping recording on a non-existent channel"
1034 );
1035 drop(archive);
1036 drop(aeron);
1037 drop(media_driver);
1038 pub_error_frame_handler.release();
1039 error_handler.release();
1040 Ok(())
1041 }
1042
1043 #[test]
1044 #[serial]
1045 fn test_replay_with_invalid_recording_id() -> Result<(), Box<dyn Error>> {
1046 let (aeron, archive_context, media_driver, mut pub_error_frame_handler, mut error_handler) =
1047 start_aeron_archive()?;
1048 let archive_connector =
1049 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
1050 let archive = archive_connector
1051 .poll_blocking(Duration::from_secs(30))
1052 .expect("failed to connect to archive");
1053
1054 let invalid_recording_id = -999;
1055 let params = AeronArchiveReplayParams::new(0, i32::MAX, 0, 100, 0, 0)?;
1056 let result = archive.start_replay(
1057 invalid_recording_id,
1058 &"aeron:udp?endpoint=localhost:8888".into_c_string(),
1059 STREAM_ID,
1060 ¶ms,
1061 );
1062 assert!(
1063 result.is_err(),
1064 "Expected error when starting replay with an invalid recording id"
1065 );
1066 drop(archive);
1067 drop(aeron);
1068 drop(media_driver);
1069 pub_error_frame_handler.release();
1070 error_handler.release();
1071 Ok(())
1072 }
1073
1074 #[test]
1075 #[serial]
1076 fn test_archive_reconnect_after_close() -> Result<(), Box<dyn std::error::Error>> {
1077 let (aeron, archive_context, media_driver, mut pub_error_frame_handler, mut error_handler) =
1078 start_aeron_archive()?;
1079 let archive_connector =
1080 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
1081 let archive = archive_connector
1082 .poll_blocking(Duration::from_secs(30))
1083 .expect("failed to connect to archive");
1084
1085 drop(archive);
1086
1087 let archive_connector = AeronArchiveAsyncConnect::new_with_aeron(&archive_context, &aeron)?;
1088 let new_archive = archive_connector
1089 .poll_blocking(Duration::from_secs(30))
1090 .expect("failed to reconnect to archive");
1091 assert!(
1092 new_archive.get_archive_id() > 0,
1093 "Reconnected archive should have a valid archive id"
1094 );
1095
1096 drop(new_archive);
1097 drop(aeron);
1098 drop(media_driver);
1099 pub_error_frame_handler.release();
1100 error_handler.release();
1101 Ok(())
1102 }
1103}
1104
1105#[cfg(test)]
1107mod slow_consumer_test;