1#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9#[allow(improper_ctypes_definitions)]
19#[allow(unpredictable_function_pointer_comparisons)]
20pub mod bindings {
21 include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
22}
23
24use bindings::*;
25use std::cell::Cell;
26use std::os::raw::c_int;
27use std::time::{Duration, Instant};
28
29pub mod testing;
30
31include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
32include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
33
34pub type SourceLocation = bindings::aeron_archive_source_location_t;
35pub const SOURCE_LOCATION_LOCAL: aeron_archive_source_location_en =
36 SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_LOCAL;
37pub const SOURCE_LOCATION_REMOTE: aeron_archive_source_location_en =
38 SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_REMOTE;
39
40pub struct NoOpAeronIdleStrategyFunc;
41
42impl AeronIdleStrategyFuncCallback for NoOpAeronIdleStrategyFunc {
43 fn handle_aeron_idle_strategy_func(&mut self, _work_count: c_int) -> () {}
44}
45
46pub struct RecordingPos;
47impl RecordingPos {
48 pub fn find_counter_id_by_session(
49 counter_reader: &AeronCountersReader,
50 session_id: i32,
51 ) -> i32 {
52 unsafe {
53 aeron_archive_recording_pos_find_counter_id_by_session_id(
54 counter_reader.get_inner(),
55 session_id,
56 )
57 }
58 }
59 pub fn find_counter_id_by_recording_id(
60 counter_reader: &AeronCountersReader,
61 recording_id: i64,
62 ) -> i32 {
63 unsafe {
64 aeron_archive_recording_pos_find_counter_id_by_recording_id(
65 counter_reader.get_inner(),
66 recording_id,
67 )
68 }
69 }
70
71 pub fn get_recording_id_block(
74 counters_reader: &AeronCountersReader,
75 counter_id: i32,
76 wait: Duration,
77 ) -> Result<i64, AeronCError> {
78 let mut result = Self::get_recording_id(counters_reader, counter_id);
79 let instant = Instant::now();
80
81 while result.is_err() && instant.elapsed() < wait {
82 result = Self::get_recording_id(counters_reader, counter_id);
83 #[cfg(debug_assertions)]
84 std::thread::sleep(Duration::from_millis(10));
85 }
86
87 return result;
88 }
89
90 pub fn get_recording_id(
93 counters_reader: &AeronCountersReader,
94 counter_id: i32,
95 ) -> Result<i64, AeronCError> {
96 pub const RECORDING_POSITION_TYPE_ID: i32 = 100;
99
100 pub const RECORD_ALLOCATED: i32 = 1;
102
103 pub const NULL_RECORDING_ID: i64 = -1;
105
106 if counter_id < 0 {
107 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
108 }
109
110 let state = counters_reader.counter_state(counter_id)?;
111 if state != RECORD_ALLOCATED {
112 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
113 }
114
115 let type_id = counters_reader.counter_type_id(counter_id)?;
116 if type_id != RECORDING_POSITION_TYPE_ID {
117 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
118 }
119
120 let recording_id = Cell::new(-1);
126 counters_reader.foreach_counter_once(|value, id, type_id, key, label| {
127 if id == counter_id && type_id == RECORDING_POSITION_TYPE_ID {
128 let mut val = [0u8; 8];
129 val.copy_from_slice(&key[0..8]);
130 let Ok(value) = i64::from_le_bytes(val).try_into();
131 recording_id.set(value);
132 }
133 });
134 let recording_id = recording_id.get();
135 if recording_id < 0 {
136 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
137 }
138
139 Ok(recording_id)
140 }
141}
142
143unsafe extern "C" fn default_encoded_credentials(
144 _clientd: *mut std::os::raw::c_void,
145) -> *mut aeron_archive_encoded_credentials_t {
146 let empty_credentials = Box::new(aeron_archive_encoded_credentials_t {
148 data: std::ptr::null(),
149 length: 0,
150 });
151 Box::into_raw(empty_credentials)
152}
153
154impl AeronArchive {
155 pub fn aeron(&self) -> Aeron {
156 self.get_archive_context().get_aeron()
157 }
158}
159
160impl AeronArchiveAsyncConnect {
161 #[inline]
162 pub fn new_with_aeron(ctx: &AeronArchiveContext, aeron: &Aeron) -> Result<Self, AeronCError> {
164 let resource_async = Self::new(ctx)?;
165 resource_async.inner.add_dependency(aeron.clone());
166 Ok(resource_async)
167 }
168}
169
170macro_rules! impl_archive_position_methods {
171 ($pub_type:ty) => {
172 impl $pub_type {
173 pub fn get_archive_position(&self) -> Result<i64, AeronCError> {
176 if let Some(aeron) = self.inner.get_dependency::<Aeron>() {
177 let counter_reader = &aeron.counters_reader();
178 self.get_archive_position_with(counter_reader)
179 } else {
180 Err(AeronCError::from_code(-1))
181 }
182 }
183
184 pub fn get_archive_position_with(
187 &self,
188 counters: &AeronCountersReader,
189 ) -> Result<i64, AeronCError> {
190 let session_id = self.get_constants()?.session_id();
191 let counter_id = RecordingPos::find_counter_id_by_session(counters, session_id);
192 if counter_id < 0 {
193 return Err(AeronCError::from_code(counter_id));
194 }
195 let position = counters.get_counter_value(counter_id);
196 if position < 0 {
197 return Err(AeronCError::from_code(position as i32));
198 }
199 Ok(position)
200 }
201
202 pub fn is_archive_position_with(&self, length_inclusive: usize) -> bool {
205 let archive_position = self.get_archive_position().unwrap_or(-1);
206 if archive_position < 0 {
207 return false;
208 }
209 self.position() - archive_position <= length_inclusive as i64
210 }
211 }
212 };
213}
214
215impl_archive_position_methods!(AeronPublication);
216impl_archive_position_methods!(AeronExclusivePublication);
217
218impl AeronArchiveContext {
219 pub fn set_no_credentials_supplier(&self) -> Result<i32, AeronCError> {
223 self.set_credentials_supplier(
224 Some(default_encoded_credentials),
225 None,
226 None::<&Handler<AeronArchiveCredentialsFreeFuncLogger>>,
227 )
228 }
229
230 pub fn new_with_no_credentials_supplier(
234 aeron: &Aeron,
235 request_control_channel: &str,
236 response_control_channel: &str,
237 recording_events_channel: &str,
238 ) -> Result<AeronArchiveContext, AeronCError> {
239 let context = Self::new()?;
240 context.set_no_credentials_supplier()?;
241 context.set_aeron(aeron)?;
242 context.set_control_request_channel(&request_control_channel.into_c_string())?;
243 context.set_control_response_channel(&response_control_channel.into_c_string())?;
244 context.set_recording_events_channel(&recording_events_channel.into_c_string())?;
245 context.set_idle_strategy(Some(&Handler::leak(NoOpAeronIdleStrategyFunc)))?;
247 Ok(context)
248 }
249}
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254 use log::{error, info};
255
256 use crate::testing::EmbeddedArchiveMediaDriverProcess;
257 use serial_test::serial;
258 use std::cell::Cell;
259 use std::error;
260 use std::error::Error;
261 use std::str::FromStr;
262 use std::sync::atomic::{AtomicBool, Ordering};
263 use std::sync::Arc;
264 use std::thread::{sleep, JoinHandle};
265 use std::time::{Duration, Instant};
266
267 #[derive(Default, Debug)]
268 struct ErrorCount {
269 error_count: usize,
270 }
271
272 impl AeronErrorHandlerCallback for ErrorCount {
273 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
274 error!("Aeron error {}: {}", error_code, msg);
275 self.error_count += 1;
276 }
277 }
278
279 pub const ARCHIVE_CONTROL_REQUEST: &str = "aeron:udp?endpoint=localhost:8010";
280 pub const ARCHIVE_CONTROL_RESPONSE: &str = "aeron:udp?endpoint=localhost:8011";
281 pub const ARCHIVE_RECORDING_EVENTS: &str =
282 "aeron:udp?control-mode=dynamic|control=localhost:8012";
283
284 #[test]
285 fn test_uri_string_builder() -> Result<(), AeronCError> {
286 let builder = AeronUriStringBuilder::default();
287
288 builder.init_new()?;
289 builder
290 .media(Media::Udp)? .mtu_length(1024 * 64)?
292 .set_initial_position(127424949617280, 1182294755, 65536)?;
293 let uri = builder.build(1024)?;
294 assert_eq!("aeron:udp?term-id=-1168322114|term-length=65536|mtu=65536|init-term-id=1182294755|term-offset=33408", uri);
295
296 builder.init_new()?;
297 let uri = builder
298 .media(Media::Udp)?
299 .control_mode(ControlMode::Dynamic)?
300 .reliable(false)?
301 .ttl(2)?
302 .endpoint("localhost:1235")?
303 .control("localhost:1234")?
304 .build(1024)?;
305 assert_eq!("aeron:udp?ttl=2|control-mode=dynamic|endpoint=localhost:1235|control=localhost:1234|reliable=false", uri);
306
307 let uri = AeronUriStringBuilder::from_str("aeron:udp?endpoint=localhost:8010")?
308 .ttl(5)?
309 .build(1024)?;
310
311 assert_eq!("aeron:udp?ttl=5|endpoint=localhost:8010", uri);
312
313 let uri = uri.parse::<AeronUriStringBuilder>()?.ttl(6)?.build(1024)?;
314
315 assert_eq!("aeron:udp?ttl=6|endpoint=localhost:8010", uri);
316
317 Ok(())
318 }
319
320 pub const STREAM_ID: i32 = 1033;
321 pub const MESSAGE_PREFIX: &str = "Message-Prefix-";
322 pub const CONTROL_ENDPOINT: &str = "localhost:23265";
323 pub const RECORDING_ENDPOINT: &str = "localhost:23266";
324 pub const LIVE_ENDPOINT: &str = "localhost:23267";
325 pub const REPLAY_ENDPOINT: &str = "localhost:0";
326 #[test]
329 #[serial]
330 fn test_simple_replay_merge() -> Result<(), AeronCError> {
331 let _ = env_logger::Builder::new()
332 .is_test(true)
333 .filter_level(log::LevelFilter::Info)
334 .try_init();
335
336 EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
337 .expect("failed to kill all java processes");
338
339 assert!(is_udp_port_available(23265));
340 assert!(is_udp_port_available(23266));
341 assert!(is_udp_port_available(23267));
342 assert!(is_udp_port_available(23268));
343 let id = Aeron::nano_clock();
344 let aeron_dir = format!("target/aeron/{}/shm", id);
345 let archive_dir = format!("target/aeron/{}/archive", id);
346
347 info!("starting archive media driver");
348 let media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
349 &aeron_dir,
350 &format!("{}/archive", aeron_dir),
351 ARCHIVE_CONTROL_REQUEST,
352 ARCHIVE_CONTROL_RESPONSE,
353 ARCHIVE_RECORDING_EVENTS,
354 )
355 .expect("Failed to start embedded media driver");
356
357 info!("connecting to archive");
358 let (archive, aeron) = media_driver
359 .archive_connect()
360 .expect("Could not connect to archive client");
361
362 let running = Arc::new(AtomicBool::new(true));
363
364 info!("connected to archive, adding publication");
365 assert!(!aeron.is_closed());
366
367 let (session_id, publisher_thread) =
368 reply_merge_publisher(&archive, aeron.clone(), running.clone())?;
369
370 {
371 let context = AeronContext::new()?;
372 context.set_dir(&media_driver.aeron_dir)?;
373 let error_handler = Handler::leak(ErrorCount::default());
374 context.set_error_handler(Some(&error_handler))?;
375 let aeron = Aeron::new(&context)?;
376 aeron.start()?;
377 let aeron_archive_context = archive.get_archive_context();
378 let aeron_archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
379 &aeron,
380 aeron_archive_context.get_control_request_channel(),
381 aeron_archive_context.get_control_response_channel(),
382 aeron_archive_context.get_recording_events_channel(),
383 )?;
384 aeron_archive_context.set_error_handler(Some(&error_handler))?;
385 let archive = AeronArchiveAsyncConnect::new_with_aeron(&aeron_archive_context, &aeron)?
386 .poll_blocking(Duration::from_secs(30))
387 .expect("failed to connect to archive");
388 replay_merge_subscription(&archive, aeron.clone(), session_id)?;
389 }
390
391 running.store(false, Ordering::Release);
392 publisher_thread.join().unwrap();
393
394 Ok(())
395 }
396
397 fn reply_merge_publisher(
398 archive: &AeronArchive,
399 aeron: Aeron,
400 running: Arc<AtomicBool>,
401 ) -> Result<(i32, JoinHandle<()>), AeronCError> {
402 let publication = aeron.add_publication(
403 &format!("aeron:udp?control={CONTROL_ENDPOINT}|control-mode=dynamic|term-length=65536")
405 .into_c_string(),
406 STREAM_ID,
407 Duration::from_secs(5),
408 )?;
409
410 info!(
411 "publication {} [status={:?}]",
412 publication.channel(),
413 publication.channel_status()
414 );
415 assert_eq!(1, publication.channel_status());
416
417 let session_id = publication.session_id();
418 let recording_channel = format!(
419 "aeron:udp?endpoint={RECORDING_ENDPOINT}|control={CONTROL_ENDPOINT}|session-id={session_id}"
421 );
422 info!("recording channel {}", recording_channel);
423 archive.start_recording(
424 &recording_channel.into_c_string(),
425 STREAM_ID,
426 SOURCE_LOCATION_REMOTE,
427 true,
428 )?;
429
430 info!("waiting for publisher to be connected");
431 while !publication.is_connected() {
432 thread::sleep(Duration::from_millis(100));
433 }
434 info!("publisher to be connected");
435 let counters_reader = aeron.counters_reader();
436 let mut caught_up_count = 0;
437 let publisher_thread = thread::spawn(move || {
438 let mut message_count = 0;
439
440 while running.load(Ordering::Acquire) {
441 let message = format!("{}{}", MESSAGE_PREFIX, message_count);
442 while publication.offer(
443 message.as_bytes(),
444 Handlers::no_reserved_value_supplier_handler(),
445 ) <= 0
446 {
447 thread::sleep(Duration::from_millis(10));
448 }
449 message_count += 1;
450 if message_count % 10_000 == 0 {
451 info!(
452 "Published {} messages [position={}]",
453 message_count,
454 publication.position()
455 );
456 }
457 if message_count > 10_000 {
459 while !publication.is_archive_position_with(0) {
461 thread::sleep(Duration::from_micros(300));
462 }
463 caught_up_count += 1;
464 }
465 }
466 assert!(caught_up_count > 0);
467 info!("Publisher thread terminated");
468 });
469 Ok((session_id, publisher_thread))
470 }
471
472 fn replay_merge_subscription(
473 archive: &AeronArchive,
474 aeron: Aeron,
475 session_id: i32,
476 ) -> Result<(), AeronCError> {
477 let replay_channel = format!("aeron:udp?session-id={session_id}").into_c_string();
479 info!("replay channel {:?}", replay_channel);
480
481 let replay_destination = format!("aeron:udp?endpoint={REPLAY_ENDPOINT}").into_c_string();
482 info!("replay destination {:?}", replay_destination);
483
484 let live_destination =
485 format!("aeron:udp?endpoint={LIVE_ENDPOINT}|control={CONTROL_ENDPOINT}")
486 .into_c_string();
487 info!("live destination {:?}", live_destination);
488
489 let counters_reader = aeron.counters_reader();
490 let mut counter_id = -1;
491
492 while counter_id < 0 {
493 counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
494 }
495 info!(
496 "counter id {} {:?}",
497 counter_id,
498 counters_reader.get_counter_label(counter_id, 1024)
499 );
500 info!(
501 "counter id {} position={:?}",
502 counter_id,
503 counters_reader.get_counter_value(counter_id)
504 );
505
506 let start_position = 0;
528 let recording_id = RecordingPos::get_recording_id_block(
529 &aeron.counters_reader(),
530 counter_id,
531 Duration::from_secs(5),
532 )?;
533
534 let subscribe_channel =
535 format!("aeron:udp?control-mode=manual|session-id={session_id}").into_c_string();
536 info!("subscribe channel {:?}", subscribe_channel);
537 let subscription = aeron.add_subscription(
538 &subscribe_channel,
539 STREAM_ID,
540 Handlers::no_available_image_handler(),
541 Handlers::no_unavailable_image_handler(),
542 Duration::from_secs(5),
543 )?;
544
545 let replay_merge = AeronArchiveReplayMerge::new(
546 &subscription,
547 &archive,
548 &replay_channel,
549 &replay_destination,
550 &live_destination,
551 recording_id,
552 start_position,
553 Aeron::epoch_clock(),
554 10_000,
555 )?;
556
557 info!(
558 "ReplayMerge initialization: recordingId={}, startPosition={}, subscriptionChannel={:?}, replayChannel={:?}, replayDestination={:?}, liveDestination={:?}",
559 recording_id,
560 start_position,
561 subscribe_channel,
562 &replay_channel,
563 &replay_destination,
564 &live_destination
565 );
566
567 info!(
578 "about to start_replay [maxRecordPosition={:?}]",
579 archive.get_max_recorded_position(recording_id)
580 );
581
582 let mut reply_count = 0;
583 while !replay_merge.is_merged() {
584 assert!(!replay_merge.has_failed());
585 if replay_merge.poll_once(
586 |buffer, _header| {
587 reply_count += 1;
588 if reply_count % 10_000 == 0 {
589 info!(
590 "replay-merge [count={}, isMerged={}, isLive={}]",
591 reply_count,
592 replay_merge.is_merged(),
593 replay_merge.is_live_added()
594 );
595 }
596 },
597 100,
598 )? == 0
599 {
600 let err = archive.poll_for_error_response_as_string(4096)?;
601 if !err.is_empty() {
602 panic!("{}", err);
603 }
604 if Aeron::errmsg().len() > 0 && "no error" != Aeron::errmsg() {
605 panic!("{}", Aeron::errmsg());
606 }
607 archive.poll_for_recording_signals()?;
608 thread::sleep(Duration::from_millis(100));
609 }
610 }
611 assert!(!replay_merge.has_failed());
612 assert!(replay_merge.is_live_added());
613 assert!(reply_count > 10_000);
614 Ok(())
615 }
616
617 #[test]
618 fn version_check() {
619 let major = unsafe { crate::aeron_version_major() };
620 let minor = unsafe { crate::aeron_version_minor() };
621 let patch = unsafe { crate::aeron_version_patch() };
622
623 let aeron_version = format!("{}.{}.{}", major, minor, patch);
624
625 let cargo_version = "1.48.10";
626 assert_eq!(aeron_version, cargo_version);
627 }
628
629 use std::thread;
630
631 pub fn start_aeron_archive() -> Result<
632 (
633 Aeron,
634 AeronArchiveContext,
635 EmbeddedArchiveMediaDriverProcess,
636 ),
637 Box<dyn Error>,
638 > {
639 let id = Aeron::nano_clock();
640 let aeron_dir = format!("target/aeron/{}/shm", id);
641 let archive_dir = format!("target/aeron/{}/archive", id);
642
643 let request_port = find_unused_udp_port(8000).expect("Could not find port");
644 let response_port = find_unused_udp_port(request_port + 1).expect("Could not find port");
645 let recording_event_port =
646 find_unused_udp_port(response_port + 1).expect("Could not find port");
647 let request_control_channel = &format!("aeron:udp?endpoint=localhost:{}", request_port);
648 let response_control_channel = &format!("aeron:udp?endpoint=localhost:{}", response_port);
649 let recording_events_channel =
650 &format!("aeron:udp?endpoint=localhost:{}", recording_event_port);
651 assert_ne!(request_control_channel, response_control_channel);
652
653 let archive_media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
654 &aeron_dir,
655 &archive_dir,
656 request_control_channel,
657 response_control_channel,
658 recording_events_channel,
659 )
660 .expect("Failed to start Java process");
661
662 let aeron_context = AeronContext::new()?;
663 aeron_context.set_dir(&aeron_dir.into_c_string())?;
664 aeron_context.set_client_name(&"test".into_c_string())?;
665 aeron_context.set_publication_error_frame_handler(Some(&Handler::leak(
666 AeronPublicationErrorFrameHandlerLogger,
667 )))?;
668 let error_handler = Handler::leak(ErrorCount::default());
669 aeron_context.set_error_handler(Some(&error_handler))?;
670 let aeron = Aeron::new(&aeron_context)?;
671 aeron.start()?;
672
673 let archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
674 &aeron,
675 request_control_channel,
676 response_control_channel,
677 recording_events_channel,
678 )?;
679 archive_context.set_error_handler(Some(&error_handler))?;
680 Ok((aeron, archive_context, archive_media_driver))
681 }
682
683 #[test]
684 #[serial]
685 pub fn test_aeron_archive() -> Result<(), Box<dyn error::Error>> {
686 let _ = env_logger::Builder::new()
687 .is_test(true)
688 .filter_level(log::LevelFilter::Info)
689 .try_init();
690 EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
691 .expect("failed to kill all java processes");
692
693 let (aeron, archive_context, media_driver) = start_aeron_archive()?;
694
695 assert!(!aeron.is_closed());
696
697 info!("connected to aeron");
698
699 let archive_connector =
700 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
701 let archive = archive_connector
702 .poll_blocking(Duration::from_secs(30))
703 .expect("failed to connect to aeron archive media driver");
704
705 assert!(archive.get_archive_id() > 0);
706
707 let channel = AERON_IPC_STREAM;
708 let stream_id = 10;
709
710 let subscription_id =
711 archive.start_recording(channel, stream_id, SOURCE_LOCATION_LOCAL, true)?;
712
713 assert!(subscription_id >= 0);
714 info!("subscription id {}", subscription_id);
715
716 let publication = aeron
717 .async_add_exclusive_publication(channel, stream_id)?
718 .poll_blocking(Duration::from_secs(5))?;
719
720 for i in 0..11 {
721 while publication.offer(
722 "123456".as_bytes(),
723 Handlers::no_reserved_value_supplier_handler(),
724 ) <= 0
725 {
726 sleep(Duration::from_millis(50));
727 archive.poll_for_recording_signals()?;
728 let err = archive.poll_for_error_response_as_string(4096)?;
729 if !err.is_empty() {
730 panic!("{}", err);
731 }
732 archive.idle();
733 }
734 info!("sent message {i} [test_aeron_archive]");
735 }
736
737 archive.idle();
738 let session_id = publication.get_constants()?.session_id;
739 info!("publication session id {}", session_id);
740 let stop_position = publication.position();
742 info!(
743 "publication stop position {} [publication={:?}]",
744 stop_position,
745 publication.get_constants()
746 );
747 let counters_reader = aeron.counters_reader();
748 info!("counters reader ready {:?}", counters_reader);
749
750 let mut counter_id = -1;
751
752 let start = Instant::now();
753 while counter_id <= 0 && start.elapsed() < Duration::from_secs(5) {
754 counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
755 info!("counter id {}", counter_id);
756 }
757
758 assert!(counter_id >= 0);
759
760 info!("counter id {counter_id}, session id {session_id}");
761 while counters_reader.get_counter_value(counter_id) < stop_position {
762 info!(
763 "current archive publication stop position {}",
764 counters_reader.get_counter_value(counter_id)
765 );
766 sleep(Duration::from_millis(50));
767 }
768 info!(
769 "found archive publication stop position {}",
770 counters_reader.get_counter_value(counter_id)
771 );
772
773 archive.stop_recording_channel_and_stream(channel, stream_id)?;
774 drop(publication);
775
776 info!("list recordings");
777 let found_recording_id = Cell::new(-1);
778 let start_pos = Cell::new(-1);
779 let end_pos = Cell::new(-1);
780 let start = Instant::now();
781 while start.elapsed() < Duration::from_secs(5) && found_recording_id.get() == -1 {
782 let mut count = 0;
783 archive.list_recordings_for_uri_once(
784 &mut count,
785 0,
786 i32::MAX,
787 channel,
788 stream_id,
789 |d: AeronArchiveRecordingDescriptor| {
790 assert_eq!(d.stream_id, stream_id);
791 info!("found recording {:#?}", d);
792 info!(
793 "strippedChannel={}, originalChannel={}",
794 d.stripped_channel(),
795 d.original_channel()
796 );
797 if d.stop_position > d.start_position && d.stop_position > 0 {
798 found_recording_id.set(d.recording_id);
799 start_pos.set(d.start_position);
800 end_pos.set(d.stop_position);
801 }
802
803 let copy = d.clone_struct();
805 assert_eq!(copy.deref(), d.deref());
806 assert_eq!(copy.recording_id, d.recording_id);
807 assert_eq!(copy.control_session_id, d.control_session_id);
808 assert_eq!(copy.mtu_length, d.mtu_length);
809 assert_eq!(copy.source_identity_length, d.source_identity_length);
810 },
811 )?;
812 archive.poll_for_recording_signals()?;
813 let err = archive.poll_for_error_response_as_string(4096)?;
814 if !err.is_empty() {
815 panic!("{}", err);
816 }
817 }
818 assert!(start.elapsed() < Duration::from_secs(5));
819 info!("start replay");
820 let params = AeronArchiveReplayParams::new(
821 0,
822 i32::MAX,
823 start_pos.get(),
824 end_pos.get() - start_pos.get(),
825 0,
826 0,
827 )?;
828 info!("replay params {:#?}", params);
829 let replay_stream_id = 45;
830 let replay_session_id =
831 archive.start_replay(found_recording_id.get(), channel, replay_stream_id, ¶ms)?;
832 let session_id = replay_session_id as i32;
833
834 info!("replay session id {}", replay_session_id);
835 info!("session id {}", session_id);
836 let channel_replay =
837 format!("{}?session-id={}", channel.to_str().unwrap(), session_id).into_c_string();
838 info!("archive id: {}", archive.get_archive_id());
839
840 info!("add subscription {:?}", channel_replay);
841 let subscription = aeron
842 .async_add_subscription(
843 &channel_replay,
844 replay_stream_id,
845 Some(&Handler::leak(AeronAvailableImageLogger)),
846 Some(&Handler::leak(AeronUnavailableImageLogger)),
847 )?
848 .poll_blocking(Duration::from_secs(10))?;
849
850 #[derive(Default)]
851 struct FragmentHandler {
852 count: Cell<usize>,
853 }
854
855 impl AeronFragmentHandlerCallback for FragmentHandler {
856 fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], _header: AeronHeader) {
857 assert_eq!(buffer, "123456".as_bytes());
858
859 self.count.set(self.count.get() + 1);
861 }
862 }
863
864 let poll = Handler::leak(FragmentHandler::default());
865
866 let start = Instant::now();
867 while start.elapsed() < Duration::from_secs(10) && subscription.poll(Some(&poll), 100)? <= 0
868 {
869 let err = archive.poll_for_error_response_as_string(4096)?;
870 if !err.is_empty() {
871 panic!("{}", err);
872 }
873 }
874 assert!(
875 start.elapsed() < Duration::from_secs(10),
876 "messages not received {:?}",
877 poll.count
878 );
879 info!("aeron {:?}", aeron);
880 info!("ctx {:?}", archive_context);
881 assert_eq!(11, poll.count.get());
882 Ok(())
883 }
884
885 #[test]
886 #[serial]
887 fn test_invalid_recording_channel() -> Result<(), Box<dyn Error>> {
888 let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
889 let archive_connector =
890 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
891 let archive = archive_connector
892 .poll_blocking(Duration::from_secs(30))
893 .expect("failed to connect to archive");
894
895 let invalid_channel = "invalid:channel".into_c_string();
896 let result =
897 archive.start_recording(&invalid_channel, STREAM_ID, SOURCE_LOCATION_LOCAL, true);
898 assert!(
899 result.is_err(),
900 "Expected error when starting recording with an invalid channel"
901 );
902 Ok(())
903 }
904
905 #[test]
906 #[serial]
907 fn test_stop_recording_on_nonexistent_channel() -> Result<(), Box<dyn Error>> {
908 let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
909 let archive_connector =
910 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
911 let archive = archive_connector
912 .poll_blocking(Duration::from_secs(30))
913 .expect("failed to connect to archive");
914
915 let nonexistent_channel = &"aeron:udp?endpoint=localhost:9999".into_c_string();
916 let result = archive.stop_recording_channel_and_stream(nonexistent_channel, STREAM_ID);
917 assert!(
918 result.is_err(),
919 "Expected error when stopping recording on a non-existent channel"
920 );
921 Ok(())
922 }
923
924 #[test]
925 #[serial]
926 fn test_replay_with_invalid_recording_id() -> Result<(), Box<dyn Error>> {
927 let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
928 let archive_connector =
929 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
930 let archive = archive_connector
931 .poll_blocking(Duration::from_secs(30))
932 .expect("failed to connect to archive");
933
934 let invalid_recording_id = -999;
935 let params = AeronArchiveReplayParams::new(0, i32::MAX, 0, 100, 0, 0)?;
936 let result = archive.start_replay(
937 invalid_recording_id,
938 &"aeron:udp?endpoint=localhost:8888".into_c_string(),
939 STREAM_ID,
940 ¶ms,
941 );
942 assert!(
943 result.is_err(),
944 "Expected error when starting replay with an invalid recording id"
945 );
946 Ok(())
947 }
948
949 #[test]
950 #[serial]
951 fn test_archive_reconnect_after_close() -> Result<(), Box<dyn std::error::Error>> {
952 let (aeron, archive_context, media_driver) = start_aeron_archive()?;
953 let archive_connector =
954 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
955 let archive = archive_connector
956 .poll_blocking(Duration::from_secs(30))
957 .expect("failed to connect to archive");
958
959 drop(archive);
960
961 let archive_connector = AeronArchiveAsyncConnect::new_with_aeron(&archive_context, &aeron)?;
962 let new_archive = archive_connector
963 .poll_blocking(Duration::from_secs(30))
964 .expect("failed to reconnect to archive");
965 assert!(
966 new_archive.get_archive_id() > 0,
967 "Reconnected archive should have a valid archive id"
968 );
969
970 drop(media_driver);
971 Ok(())
972 }
973}
974
975#[cfg(test)]
977mod slow_consumer_test;