rusteron_archive/
lib.rs

1/**/
2#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9//! # Features
10//!
11//! - **`static`**: When enabled, this feature statically links the Aeron C code.
12//!   By default, the library uses dynamic linking to the Aeron C libraries.
13//! - **`backtrace`**: When enabled will log a backtrace for each AeronCError
14//! - **`extra-logging`**: When enabled will log when resource is created and destroyed. Useful if you're seeing a segfault due to a resource being closed
15//! - **`log-c-bindings`**: When enabled will log every C binding call with arguments and return values. Useful for debugging FFI interactions
16//! - **`precompile`**: When enabled will use precompiled C code instead of requiring cmake and java to be installed
17
18pub mod bindings {
19    include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
20}
21
22use bindings::*;
23use std::cell::Cell;
24use std::os::raw::c_int;
25use std::time::{Duration, Instant};
26
27pub mod testing;
28
29include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
30include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
31
32pub type SourceLocation = bindings::aeron_archive_source_location_t;
33pub const SOURCE_LOCATION_LOCAL: aeron_archive_source_location_en =
34    SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_LOCAL;
35pub const SOURCE_LOCATION_REMOTE: aeron_archive_source_location_en =
36    SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_REMOTE;
37
38pub struct NoOpAeronIdleStrategyFunc;
39
40impl AeronIdleStrategyFuncCallback for NoOpAeronIdleStrategyFunc {
41    fn handle_aeron_idle_strategy_func(&mut self, _work_count: c_int) -> () {}
42}
43
44pub struct RecordingPos;
45impl RecordingPos {
46    pub fn find_counter_id_by_session(
47        counter_reader: &AeronCountersReader,
48        session_id: i32,
49    ) -> i32 {
50        unsafe {
51            aeron_archive_recording_pos_find_counter_id_by_session_id(
52                counter_reader.get_inner(),
53                session_id,
54            )
55        }
56    }
57    pub fn find_counter_id_by_recording_id(
58        counter_reader: &AeronCountersReader,
59        recording_id: i64,
60    ) -> i32 {
61        unsafe {
62            aeron_archive_recording_pos_find_counter_id_by_recording_id(
63                counter_reader.get_inner(),
64                recording_id,
65            )
66        }
67    }
68
69    /// Return the recordingId embedded in the key of the given counter
70    /// if it is indeed a "recording position" counter. Otherwise return -1.
71    pub fn get_recording_id_block(
72        counters_reader: &AeronCountersReader,
73        counter_id: i32,
74        wait: Duration,
75    ) -> Result<i64, AeronCError> {
76        let mut result = Self::get_recording_id(counters_reader, counter_id);
77        let instant = Instant::now();
78
79        while result.is_err() && instant.elapsed() < wait {
80            result = Self::get_recording_id(counters_reader, counter_id);
81            #[cfg(debug_assertions)]
82            std::thread::sleep(Duration::from_millis(10));
83        }
84
85        return result;
86    }
87
88    /// Return the recordingId embedded in the key of the given counter
89    /// if it is indeed a "recording position" counter. Otherwise return -1.
90    pub fn get_recording_id(
91        counters_reader: &AeronCountersReader,
92        counter_id: i32,
93    ) -> Result<i64, AeronCError> {
94        /// The type id for an Aeron Archive recording position counter.
95        /// In Aeron Java, this is AeronCounters.ARCHIVE_RECORDING_POSITION_TYPE_ID (which is typically 100).
96        pub const RECORDING_POSITION_TYPE_ID: i32 = 100;
97
98        /// from Aeron Java code
99        pub const RECORD_ALLOCATED: i32 = 1;
100
101        /// A constant to mean "no valid recording ID".
102        pub const NULL_RECORDING_ID: i64 = -1;
103
104        if counter_id < 0 {
105            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
106        }
107
108        let state = counters_reader.counter_state(counter_id)?;
109        if state != RECORD_ALLOCATED {
110            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
111        }
112
113        let type_id = counters_reader.counter_type_id(counter_id)?;
114        if type_id != RECORDING_POSITION_TYPE_ID {
115            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
116        }
117
118        // Read the key area. For a RECORDING_POSITION_TYPE_ID counter:
119        //    - offset 0..8 => the i64 recording_id
120        //    - offset 8..12 => the session_id (int)
121        //    etc...
122        // only need the first 8 bytes to get the recordingId.
123        let recording_id = Cell::new(-1);
124        counters_reader.foreach_counter_once(|value, id, type_id, key, label| {
125            if id == counter_id && type_id == RECORDING_POSITION_TYPE_ID {
126                let mut val = [0u8; 8];
127                val.copy_from_slice(&key[0..8]);
128                let Ok(value) = i64::from_le_bytes(val).try_into();
129                recording_id.set(value);
130            }
131        });
132        let recording_id = recording_id.get();
133        if recording_id < 0 {
134            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
135        }
136
137        Ok(recording_id)
138    }
139}
140
141unsafe extern "C" fn default_encoded_credentials(
142    _clientd: *mut std::os::raw::c_void,
143) -> *mut aeron_archive_encoded_credentials_t {
144    // Allocate a zeroed instance of `aeron_archive_encoded_credentials_t`
145    let empty_credentials = Box::new(aeron_archive_encoded_credentials_t {
146        data: std::ptr::null(),
147        length: 0,
148    });
149    Box::into_raw(empty_credentials)
150}
151
152impl AeronArchive {
153    pub fn aeron(&self) -> Aeron {
154        self.get_archive_context().get_aeron()
155    }
156}
157
158impl AeronArchiveAsyncConnect {
159    #[inline]
160    /// recommend using this method instead of standard `new` as it will link the archive to aeron so if a drop occurs archive is dropped before aeron
161    pub fn new_with_aeron(ctx: &AeronArchiveContext, aeron: &Aeron) -> Result<Self, AeronCError> {
162        let resource_async = Self::new(ctx)?;
163        resource_async.inner.add_dependency(aeron.clone());
164        Ok(resource_async)
165    }
166}
167
168macro_rules! impl_archive_position_methods {
169    ($pub_type:ty) => {
170        impl $pub_type {
171            /// Retrieves the current active live archive position using the Aeron counters.
172            /// Returns an error if not found.
173            pub fn get_archive_position(&self) -> Result<i64, AeronCError> {
174                if let Some(aeron) = self.inner.get_dependency::<Aeron>() {
175                    let counter_reader = &aeron.counters_reader();
176                    self.get_archive_position_with(counter_reader)
177                } else {
178                    Err(AeronCError::from_code(-1))
179                }
180            }
181
182            /// Retrieves the current active live archive position using the provided counter reader.
183            /// Returns an error if not found.
184            pub fn get_archive_position_with(
185                &self,
186                counters: &AeronCountersReader,
187            ) -> Result<i64, AeronCError> {
188                let session_id = self.get_constants()?.session_id();
189                let counter_id = RecordingPos::find_counter_id_by_session(counters, session_id);
190                if counter_id < 0 {
191                    return Err(AeronCError::from_code(counter_id));
192                }
193                let position = counters.get_counter_value(counter_id);
194                if position < 0 {
195                    return Err(AeronCError::from_code(position as i32));
196                }
197                Ok(position)
198            }
199
200            /// Checks if the publication's current position is within a specified inclusive length
201            /// of the archive position.
202            pub fn is_archive_position_with(&self, length_inclusive: usize) -> bool {
203                let archive_position = self.get_archive_position().unwrap_or(-1);
204                if archive_position < 0 {
205                    return false;
206                }
207                self.position() - archive_position <= length_inclusive as i64
208            }
209        }
210    };
211}
212
213impl_archive_position_methods!(AeronPublication);
214impl_archive_position_methods!(AeronExclusivePublication);
215
216impl AeronArchiveContext {
217    // The method below sets no credentials supplier, which is essential for the operation
218    // of the Aeron Archive Context. The `set_credentials_supplier` must be set to prevent
219    // segmentation faults in the C bindings.
220    pub fn set_no_credentials_supplier(&self) -> Result<i32, AeronCError> {
221        self.set_credentials_supplier(
222            Some(default_encoded_credentials),
223            None,
224            None::<&Handler<AeronArchiveCredentialsFreeFuncLogger>>,
225        )
226    }
227
228    /// This method creates a new `AeronArchiveContext` with a no-op credentials supplier.
229    /// If you do not set a credentials supplier, it will segfault.
230    /// This method ensures that a non-functional credentials supplier is set to avoid the segfault.
231    pub fn new_with_no_credentials_supplier(
232        aeron: &Aeron,
233        request_control_channel: &str,
234        response_control_channel: &str,
235        recording_events_channel: &str,
236    ) -> Result<AeronArchiveContext, AeronCError> {
237        let context = Self::new()?;
238        context.set_no_credentials_supplier()?;
239        context.set_aeron(aeron)?;
240        context.set_control_request_channel(&request_control_channel.into_c_string())?;
241        context.set_control_response_channel(&response_control_channel.into_c_string())?;
242        context.set_recording_events_channel(&recording_events_channel.into_c_string())?;
243        // see https://github.com/gsrxyz/rusteron/issues/18
244        context.set_idle_strategy(Some(&Handler::leak(NoOpAeronIdleStrategyFunc)))?;
245        Ok(context)
246    }
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252    use log::{error, info};
253
254    use crate::testing::EmbeddedArchiveMediaDriverProcess;
255    use serial_test::serial;
256    use std::cell::Cell;
257    use std::error;
258    use std::error::Error;
259    use std::str::FromStr;
260    use std::sync::atomic::{AtomicBool, Ordering};
261    use std::sync::Arc;
262    use std::thread::{sleep, JoinHandle};
263    use std::time::{Duration, Instant};
264
265    #[derive(Default, Debug)]
266    struct ErrorCount {
267        error_count: usize,
268    }
269
270    impl AeronErrorHandlerCallback for ErrorCount {
271        fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
272            error!("Aeron error {}: {}", error_code, msg);
273            self.error_count += 1;
274        }
275    }
276
277    pub const ARCHIVE_CONTROL_REQUEST: &str = "aeron:udp?endpoint=localhost:8010";
278    pub const ARCHIVE_CONTROL_RESPONSE: &str = "aeron:udp?endpoint=localhost:8011";
279    pub const ARCHIVE_RECORDING_EVENTS: &str =
280        "aeron:udp?control-mode=dynamic|control=localhost:8012";
281
282    #[test]
283    fn test_uri_string_builder() -> Result<(), AeronCError> {
284        let builder = AeronUriStringBuilder::default();
285
286        builder.init_new()?;
287        builder
288            .media(Media::Udp)? // very important to set media else set_initial_position will give an error of -1
289            .mtu_length(1024 * 64)?
290            .set_initial_position(127424949617280, 1182294755, 65536)?;
291        let uri = builder.build(1024)?;
292        assert_eq!("aeron:udp?term-id=-1168322114|term-length=65536|mtu=65536|init-term-id=1182294755|term-offset=33408", uri);
293
294        builder.init_new()?;
295        let uri = builder
296            .media(Media::Udp)?
297            .control_mode(ControlMode::Dynamic)?
298            .reliable(false)?
299            .ttl(2)?
300            .endpoint("localhost:1235")?
301            .control("localhost:1234")?
302            .build(1024)?;
303        assert_eq!("aeron:udp?ttl=2|control-mode=dynamic|endpoint=localhost:1235|control=localhost:1234|reliable=false", uri);
304
305        let uri = AeronUriStringBuilder::from_str("aeron:udp?endpoint=localhost:8010")?
306            .ttl(5)?
307            .build(1024)?;
308
309        assert_eq!("aeron:udp?ttl=5|endpoint=localhost:8010", uri);
310
311        let uri = uri.parse::<AeronUriStringBuilder>()?.ttl(6)?.build(1024)?;
312
313        assert_eq!("aeron:udp?ttl=6|endpoint=localhost:8010", uri);
314
315        Ok(())
316    }
317
318    pub const STREAM_ID: i32 = 1033;
319    pub const MESSAGE_PREFIX: &str = "Message-Prefix-";
320    pub const CONTROL_ENDPOINT: &str = "localhost:23265";
321    pub const RECORDING_ENDPOINT: &str = "localhost:23266";
322    pub const LIVE_ENDPOINT: &str = "localhost:23267";
323    pub const REPLAY_ENDPOINT: &str = "localhost:0";
324    // pub const REPLAY_ENDPOINT: &str = "localhost:23268";
325
326    #[test]
327    #[serial]
328    fn test_simple_replay_merge() -> Result<(), AeronCError> {
329        let _ = env_logger::Builder::new()
330            .is_test(true)
331            .filter_level(log::LevelFilter::Info)
332            .try_init();
333
334        EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
335            .expect("failed to kill all java processes");
336
337        assert!(is_udp_port_available(23265));
338        assert!(is_udp_port_available(23266));
339        assert!(is_udp_port_available(23267));
340        assert!(is_udp_port_available(23268));
341        let id = Aeron::nano_clock();
342        let aeron_dir = format!("target/aeron/{}/shm", id);
343        let archive_dir = format!("target/aeron/{}/archive", id);
344
345        info!("starting archive media driver");
346        let media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
347            &aeron_dir,
348            &format!("{}/archive", aeron_dir),
349            ARCHIVE_CONTROL_REQUEST,
350            ARCHIVE_CONTROL_RESPONSE,
351            ARCHIVE_RECORDING_EVENTS,
352        )
353        .expect("Failed to start embedded media driver");
354
355        info!("connecting to archive");
356        let (archive, aeron) = media_driver
357            .archive_connect()
358            .expect("Could not connect to archive client");
359
360        let running = Arc::new(AtomicBool::new(true));
361
362        info!("connected to archive, adding publication");
363        assert!(!aeron.is_closed());
364
365        let (session_id, publisher_thread) =
366            reply_merge_publisher(&archive, aeron.clone(), running.clone())?;
367
368        {
369            let context = AeronContext::new()?;
370            context.set_dir(&media_driver.aeron_dir)?;
371            let error_handler = Handler::leak(ErrorCount::default());
372            context.set_error_handler(Some(&error_handler))?;
373            let aeron = Aeron::new(&context)?;
374            aeron.start()?;
375            let aeron_archive_context = archive.get_archive_context();
376            let aeron_archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
377                &aeron,
378                aeron_archive_context.get_control_request_channel(),
379                aeron_archive_context.get_control_response_channel(),
380                aeron_archive_context.get_recording_events_channel(),
381            )?;
382            aeron_archive_context.set_error_handler(Some(&error_handler))?;
383            let archive = AeronArchiveAsyncConnect::new_with_aeron(&aeron_archive_context, &aeron)?
384                .poll_blocking(Duration::from_secs(30))
385                .expect("failed to connect to archive");
386            replay_merge_subscription(&archive, aeron.clone(), session_id)?;
387        }
388
389        running.store(false, Ordering::Release);
390        publisher_thread.join().unwrap();
391
392        Ok(())
393    }
394
395    fn reply_merge_publisher(
396        archive: &AeronArchive,
397        aeron: Aeron,
398        running: Arc<AtomicBool>,
399    ) -> Result<(i32, JoinHandle<()>), AeronCError> {
400        let publication = aeron.add_publication(
401            // &format!("aeron:udp?control={CONTROL_ENDPOINT}|control-mode=dynamic|term-length=65536|fc=tagged,g:99901/1,t:5s"),
402            &format!("aeron:udp?control={CONTROL_ENDPOINT}|control-mode=dynamic|term-length=65536")
403                .into_c_string(),
404            STREAM_ID,
405            Duration::from_secs(5),
406        )?;
407
408        info!(
409            "publication {} [status={:?}]",
410            publication.channel(),
411            publication.channel_status()
412        );
413        assert_eq!(1, publication.channel_status());
414
415        let session_id = publication.session_id();
416        let recording_channel = format!(
417            // "aeron:udp?endpoint={RECORDING_ENDPOINT}|control={CONTROL_ENDPOINT}|session-id={session_id}|gtag=99901"
418            "aeron:udp?endpoint={RECORDING_ENDPOINT}|control={CONTROL_ENDPOINT}|session-id={session_id}"
419        );
420        info!("recording channel {}", recording_channel);
421        archive.start_recording(
422            &recording_channel.into_c_string(),
423            STREAM_ID,
424            SOURCE_LOCATION_REMOTE,
425            true,
426        )?;
427
428        info!("waiting for publisher to be connected");
429        while !publication.is_connected() {
430            thread::sleep(Duration::from_millis(100));
431        }
432        info!("publisher to be connected");
433        let counters_reader = aeron.counters_reader();
434        let mut caught_up_count = 0;
435        let publisher_thread = thread::spawn(move || {
436            let mut message_count = 0;
437
438            while running.load(Ordering::Acquire) {
439                let message = format!("{}{}", MESSAGE_PREFIX, message_count);
440                while publication.offer(
441                    message.as_bytes(),
442                    Handlers::no_reserved_value_supplier_handler(),
443                ) <= 0
444                {
445                    thread::sleep(Duration::from_millis(10));
446                }
447                message_count += 1;
448                if message_count % 10_000 == 0 {
449                    info!(
450                        "Published {} messages [position={}]",
451                        message_count,
452                        publication.position()
453                    );
454                }
455                // slow down publishing so can catch up
456                if message_count > 10_000 {
457                    // ensure archiver is caught up
458                    while !publication.is_archive_position_with(0) {
459                        thread::sleep(Duration::from_micros(300));
460                    }
461                    caught_up_count += 1;
462                }
463            }
464            assert!(caught_up_count > 0);
465            info!("Publisher thread terminated");
466        });
467        Ok((session_id, publisher_thread))
468    }
469
470    fn replay_merge_subscription(
471        archive: &AeronArchive,
472        aeron: Aeron,
473        session_id: i32,
474    ) -> Result<(), AeronCError> {
475        // let replay_channel = format!("aeron:udp?control-mode=manual|session-id={session_id}");
476        let replay_channel = format!("aeron:udp?session-id={session_id}").into_c_string();
477        info!("replay channel {:?}", replay_channel);
478
479        let replay_destination = format!("aeron:udp?endpoint={REPLAY_ENDPOINT}").into_c_string();
480        info!("replay destination {:?}", replay_destination);
481
482        let live_destination =
483            format!("aeron:udp?endpoint={LIVE_ENDPOINT}|control={CONTROL_ENDPOINT}")
484                .into_c_string();
485        info!("live destination {:?}", live_destination);
486
487        let counters_reader = aeron.counters_reader();
488        let mut counter_id = -1;
489
490        while counter_id < 0 {
491            counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
492        }
493        info!(
494            "counter id {} {:?}",
495            counter_id,
496            counters_reader.get_counter_label(counter_id, 1024)
497        );
498        info!(
499            "counter id {} position={:?}",
500            counter_id,
501            counters_reader.get_counter_value(counter_id)
502        );
503
504        // let recording_id = Cell::new(-1);
505        // let start_position = Cell::new(-1);
506
507        // let mut count = 0;
508        // assert!(
509        //     archive.list_recordings_once(&mut count, 0, 1000, |descriptor| {
510        //         info!("Recording descriptor: {:?}", descriptor);
511        //         recording_id.set(descriptor.recording_id);
512        //         start_position.set(descriptor.start_position);
513        //         assert_eq!(descriptor.session_id, session_id);
514        //         assert_eq!(descriptor.stream_id, STREAM_ID);
515        //     })? >= 0
516        // );
517        // assert!(count > 0);
518        // assert!(recording_id.get() >= 0);
519
520        // let record_id = RecordingPos::get_recording_id(&aeron.counters_reader(), counter_id)?;
521        // assert_eq!(recording_id.get(), record_id);
522        //
523        // let recording_id = recording_id.get();
524        // let start_position = start_position.get();
525        let start_position = 0;
526        let recording_id = RecordingPos::get_recording_id_block(
527            &aeron.counters_reader(),
528            counter_id,
529            Duration::from_secs(5),
530        )?;
531
532        let subscribe_channel =
533            format!("aeron:udp?control-mode=manual|session-id={session_id}").into_c_string();
534        info!("subscribe channel {:?}", subscribe_channel);
535        let subscription = aeron.add_subscription(
536            &subscribe_channel,
537            STREAM_ID,
538            Handlers::no_available_image_handler(),
539            Handlers::no_unavailable_image_handler(),
540            Duration::from_secs(5),
541        )?;
542
543        let replay_merge = AeronArchiveReplayMerge::new(
544            &subscription,
545            &archive,
546            &replay_channel,
547            &replay_destination,
548            &live_destination,
549            recording_id,
550            start_position,
551            Aeron::epoch_clock(),
552            10_000,
553        )?;
554
555        info!(
556            "ReplayMerge initialization: recordingId={}, startPosition={}, subscriptionChannel={:?}, replayChannel={:?}, replayDestination={:?}, liveDestination={:?}",
557            recording_id,
558            start_position,
559            subscribe_channel,
560            &replay_channel,
561            &replay_destination,
562            &live_destination
563        );
564
565        // media_driver
566        //     .run_aeron_stats()
567        //     .expect("Failed to run aeron stats");
568
569        // info!("Waiting for subscription to connect...");
570        // while !subscription.is_connected() {
571        //     thread::sleep(Duration::from_millis(100));
572        // }
573        // info!("Subscription connected");
574
575        info!(
576            "about to start_replay [maxRecordPosition={:?}]",
577            archive.get_max_recorded_position(recording_id)
578        );
579
580        let mut reply_count = 0;
581        while !replay_merge.is_merged() {
582            assert!(!replay_merge.has_failed());
583            if replay_merge.poll_once(
584                |buffer, _header| {
585                    reply_count += 1;
586                    if reply_count % 10_000 == 0 {
587                        info!(
588                            "replay-merge [count={}, isMerged={}, isLive={}]",
589                            reply_count,
590                            replay_merge.is_merged(),
591                            replay_merge.is_live_added()
592                        );
593                    }
594                },
595                100,
596            )? == 0
597            {
598                let err = archive.poll_for_error_response_as_string(4096)?;
599                if !err.is_empty() {
600                    panic!("{}", err);
601                }
602                if Aeron::errmsg().len() > 0 && "no error" != Aeron::errmsg() {
603                    panic!("{}", Aeron::errmsg());
604                }
605                archive.poll_for_recording_signals()?;
606                thread::sleep(Duration::from_millis(100));
607            }
608        }
609        assert!(!replay_merge.has_failed());
610        assert!(replay_merge.is_live_added());
611        assert!(reply_count > 10_000);
612        Ok(())
613    }
614
615    #[test]
616    fn version_check() {
617        let major = unsafe { crate::aeron_version_major() };
618        let minor = unsafe { crate::aeron_version_minor() };
619        let patch = unsafe { crate::aeron_version_patch() };
620
621        let aeron_version = format!("{}.{}.{}", major, minor, patch);
622
623        let cargo_version = "1.48.6";
624        assert_eq!(aeron_version, cargo_version);
625    }
626
627    use std::thread;
628
629    pub fn start_aeron_archive() -> Result<
630        (
631            Aeron,
632            AeronArchiveContext,
633            EmbeddedArchiveMediaDriverProcess,
634        ),
635        Box<dyn Error>,
636    > {
637        let id = Aeron::nano_clock();
638        let aeron_dir = format!("target/aeron/{}/shm", id);
639        let archive_dir = format!("target/aeron/{}/archive", id);
640
641        let request_port = find_unused_udp_port(8000).expect("Could not find port");
642        let response_port = find_unused_udp_port(request_port + 1).expect("Could not find port");
643        let recording_event_port =
644            find_unused_udp_port(response_port + 1).expect("Could not find port");
645        let request_control_channel = &format!("aeron:udp?endpoint=localhost:{}", request_port);
646        let response_control_channel = &format!("aeron:udp?endpoint=localhost:{}", response_port);
647        let recording_events_channel =
648            &format!("aeron:udp?endpoint=localhost:{}", recording_event_port);
649        assert_ne!(request_control_channel, response_control_channel);
650
651        let archive_media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
652            &aeron_dir,
653            &archive_dir,
654            request_control_channel,
655            response_control_channel,
656            recording_events_channel,
657        )
658        .expect("Failed to start Java process");
659
660        let aeron_context = AeronContext::new()?;
661        aeron_context.set_dir(&aeron_dir.into_c_string())?;
662        aeron_context.set_client_name(&"test".into_c_string())?;
663        aeron_context.set_publication_error_frame_handler(Some(&Handler::leak(
664            AeronPublicationErrorFrameHandlerLogger,
665        )))?;
666        let error_handler = Handler::leak(ErrorCount::default());
667        aeron_context.set_error_handler(Some(&error_handler))?;
668        let aeron = Aeron::new(&aeron_context)?;
669        aeron.start()?;
670
671        let archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
672            &aeron,
673            request_control_channel,
674            response_control_channel,
675            recording_events_channel,
676        )?;
677        archive_context.set_error_handler(Some(&error_handler))?;
678        Ok((aeron, archive_context, archive_media_driver))
679    }
680
681    #[test]
682    #[serial]
683    pub fn test_aeron_archive() -> Result<(), Box<dyn error::Error>> {
684        let _ = env_logger::Builder::new()
685            .is_test(true)
686            .filter_level(log::LevelFilter::Info)
687            .try_init();
688        EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
689            .expect("failed to kill all java processes");
690
691        let (aeron, archive_context, media_driver) = start_aeron_archive()?;
692
693        assert!(!aeron.is_closed());
694
695        info!("connected to aeron");
696
697        let archive_connector =
698            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
699        let archive = archive_connector
700            .poll_blocking(Duration::from_secs(30))
701            .expect("failed to connect to aeron archive media driver");
702
703        assert!(archive.get_archive_id() > 0);
704
705        let channel = AERON_IPC_STREAM;
706        let stream_id = 10;
707
708        let subscription_id =
709            archive.start_recording(channel, stream_id, SOURCE_LOCATION_LOCAL, true)?;
710
711        assert!(subscription_id >= 0);
712        info!("subscription id {}", subscription_id);
713
714        let publication = aeron
715            .async_add_exclusive_publication(channel, stream_id)?
716            .poll_blocking(Duration::from_secs(5))?;
717
718        for i in 0..11 {
719            while publication.offer(
720                "123456".as_bytes(),
721                Handlers::no_reserved_value_supplier_handler(),
722            ) <= 0
723            {
724                sleep(Duration::from_millis(50));
725                archive.poll_for_recording_signals()?;
726                let err = archive.poll_for_error_response_as_string(4096)?;
727                if !err.is_empty() {
728                    panic!("{}", err);
729                }
730                archive.idle();
731            }
732            info!("sent message {i} [test_aeron_archive]");
733        }
734
735        archive.idle();
736        let session_id = publication.get_constants()?.session_id;
737        info!("publication session id {}", session_id);
738        // since this is single threaded need to make sure it did write to archiver, usually not required in multi-proccess app
739        let stop_position = publication.position();
740        info!(
741            "publication stop position {} [publication={:?}]",
742            stop_position,
743            publication.get_constants()
744        );
745        let counters_reader = aeron.counters_reader();
746        info!("counters reader ready {:?}", counters_reader);
747
748        let mut counter_id = -1;
749
750        let start = Instant::now();
751        while counter_id <= 0 && start.elapsed() < Duration::from_secs(5) {
752            counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
753            info!("counter id {}", counter_id);
754        }
755
756        assert!(counter_id >= 0);
757
758        info!("counter id {counter_id}, session id {session_id}");
759        while counters_reader.get_counter_value(counter_id) < stop_position {
760            info!(
761                "current archive publication stop position {}",
762                counters_reader.get_counter_value(counter_id)
763            );
764            sleep(Duration::from_millis(50));
765        }
766        info!(
767            "found archive publication stop position {}",
768            counters_reader.get_counter_value(counter_id)
769        );
770
771        archive.stop_recording_channel_and_stream(channel, stream_id)?;
772        drop(publication);
773
774        info!("list recordings");
775        let found_recording_id = Cell::new(-1);
776        let start_pos = Cell::new(-1);
777        let end_pos = Cell::new(-1);
778        let start = Instant::now();
779        while start.elapsed() < Duration::from_secs(5) && found_recording_id.get() == -1 {
780            let mut count = 0;
781            archive.list_recordings_for_uri_once(
782                &mut count,
783                0,
784                i32::MAX,
785                channel,
786                stream_id,
787                |d: AeronArchiveRecordingDescriptor| {
788                    assert_eq!(d.stream_id, stream_id);
789                    info!("found recording {:#?}", d);
790                    info!(
791                        "strippedChannel={}, originalChannel={}",
792                        d.stripped_channel(),
793                        d.original_channel()
794                    );
795                    if d.stop_position > d.start_position && d.stop_position > 0 {
796                        found_recording_id.set(d.recording_id);
797                        start_pos.set(d.start_position);
798                        end_pos.set(d.stop_position);
799                    }
800
801                    // verify clone_struct works
802                    let copy = d.clone_struct();
803                    assert_eq!(copy.deref(), d.deref());
804                    assert_eq!(copy.recording_id, d.recording_id);
805                    assert_eq!(copy.control_session_id, d.control_session_id);
806                    assert_eq!(copy.mtu_length, d.mtu_length);
807                    assert_eq!(copy.source_identity_length, d.source_identity_length);
808                },
809            )?;
810            archive.poll_for_recording_signals()?;
811            let err = archive.poll_for_error_response_as_string(4096)?;
812            if !err.is_empty() {
813                panic!("{}", err);
814            }
815        }
816        assert!(start.elapsed() < Duration::from_secs(5));
817        info!("start replay");
818        let params = AeronArchiveReplayParams::new(
819            0,
820            i32::MAX,
821            start_pos.get(),
822            end_pos.get() - start_pos.get(),
823            0,
824            0,
825        )?;
826        info!("replay params {:#?}", params);
827        let replay_stream_id = 45;
828        let replay_session_id =
829            archive.start_replay(found_recording_id.get(), channel, replay_stream_id, &params)?;
830        let session_id = replay_session_id as i32;
831
832        info!("replay session id {}", replay_session_id);
833        info!("session id {}", session_id);
834        let channel_replay =
835            format!("{}?session-id={}", channel.to_str().unwrap(), session_id).into_c_string();
836        info!("archive id: {}", archive.get_archive_id());
837
838        info!("add subscription {:?}", channel_replay);
839        let subscription = aeron
840            .async_add_subscription(
841                &channel_replay,
842                replay_stream_id,
843                Some(&Handler::leak(AeronAvailableImageLogger)),
844                Some(&Handler::leak(AeronUnavailableImageLogger)),
845            )?
846            .poll_blocking(Duration::from_secs(10))?;
847
848        #[derive(Default)]
849        struct FragmentHandler {
850            count: Cell<usize>,
851        }
852
853        impl AeronFragmentHandlerCallback for FragmentHandler {
854            fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], _header: AeronHeader) {
855                assert_eq!(buffer, "123456".as_bytes());
856
857                // Update count (using Cell for interior mutability)
858                self.count.set(self.count.get() + 1);
859            }
860        }
861
862        let poll = Handler::leak(FragmentHandler::default());
863
864        let start = Instant::now();
865        while start.elapsed() < Duration::from_secs(10) && subscription.poll(Some(&poll), 100)? <= 0
866        {
867            let err = archive.poll_for_error_response_as_string(4096)?;
868            if !err.is_empty() {
869                panic!("{}", err);
870            }
871        }
872        assert!(
873            start.elapsed() < Duration::from_secs(10),
874            "messages not received {:?}",
875            poll.count
876        );
877        info!("aeron {:?}", aeron);
878        info!("ctx {:?}", archive_context);
879        assert_eq!(11, poll.count.get());
880        Ok(())
881    }
882
883    #[test]
884    #[serial]
885    fn test_invalid_recording_channel() -> Result<(), Box<dyn Error>> {
886        let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
887        let archive_connector =
888            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
889        let archive = archive_connector
890            .poll_blocking(Duration::from_secs(30))
891            .expect("failed to connect to archive");
892
893        let invalid_channel = "invalid:channel".into_c_string();
894        let result =
895            archive.start_recording(&invalid_channel, STREAM_ID, SOURCE_LOCATION_LOCAL, true);
896        assert!(
897            result.is_err(),
898            "Expected error when starting recording with an invalid channel"
899        );
900        Ok(())
901    }
902
903    #[test]
904    #[serial]
905    fn test_stop_recording_on_nonexistent_channel() -> Result<(), Box<dyn Error>> {
906        let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
907        let archive_connector =
908            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
909        let archive = archive_connector
910            .poll_blocking(Duration::from_secs(30))
911            .expect("failed to connect to archive");
912
913        let nonexistent_channel = &"aeron:udp?endpoint=localhost:9999".into_c_string();
914        let result = archive.stop_recording_channel_and_stream(nonexistent_channel, STREAM_ID);
915        assert!(
916            result.is_err(),
917            "Expected error when stopping recording on a non-existent channel"
918        );
919        Ok(())
920    }
921
922    #[test]
923    #[serial]
924    fn test_replay_with_invalid_recording_id() -> Result<(), Box<dyn Error>> {
925        let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
926        let archive_connector =
927            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
928        let archive = archive_connector
929            .poll_blocking(Duration::from_secs(30))
930            .expect("failed to connect to archive");
931
932        let invalid_recording_id = -999;
933        let params = AeronArchiveReplayParams::new(0, i32::MAX, 0, 100, 0, 0)?;
934        let result = archive.start_replay(
935            invalid_recording_id,
936            &"aeron:udp?endpoint=localhost:8888".into_c_string(),
937            STREAM_ID,
938            &params,
939        );
940        assert!(
941            result.is_err(),
942            "Expected error when starting replay with an invalid recording id"
943        );
944        Ok(())
945    }
946
947    #[test]
948    #[serial]
949    fn test_archive_reconnect_after_close() -> Result<(), Box<dyn std::error::Error>> {
950        let (aeron, archive_context, media_driver) = start_aeron_archive()?;
951        let archive_connector =
952            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
953        let archive = archive_connector
954            .poll_blocking(Duration::from_secs(30))
955            .expect("failed to connect to archive");
956
957        drop(archive);
958
959        let archive_connector = AeronArchiveAsyncConnect::new_with_aeron(&archive_context, &aeron)?;
960        let new_archive = archive_connector
961            .poll_blocking(Duration::from_secs(30))
962            .expect("failed to reconnect to archive");
963        assert!(
964            new_archive.get_archive_id() > 0,
965            "Reconnected archive should have a valid archive id"
966        );
967
968        drop(media_driver);
969        Ok(())
970    }
971}