rusteron_archive/
lib.rs

1/**/
2#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9//! # Features
10//!
11//! - **`static`**: When enabled, this feature statically links the Aeron C code.
12//!   By default, the library uses dynamic linking to the Aeron C libraries.
13//! - **`backtrace`** - When enabled will log a backtrace for each AeronCError
14//! - **`extra-logging`** - When enabled will log when resource is created and destroyed. useful if your seeing a segfault due to a resource being closed
15//! - **`precompile`** - When enabled will use precompiled c code instead of requiring cmake and java to me installed
16
17pub mod bindings {
18    include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
19}
20
21use bindings::*;
22use std::cell::Cell;
23use std::os::raw::c_int;
24use std::time::{Duration, Instant};
25
26pub mod testing;
27
28include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
29include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
30
31pub type SourceLocation = bindings::aeron_archive_source_location_t;
32pub const SOURCE_LOCATION_LOCAL: aeron_archive_source_location_en =
33    SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_LOCAL;
34pub const SOURCE_LOCATION_REMOTE: aeron_archive_source_location_en =
35    SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_REMOTE;
36
37pub struct NoOpAeronIdleStrategyFunc;
38
39impl AeronIdleStrategyFuncCallback for NoOpAeronIdleStrategyFunc {
40    fn handle_aeron_idle_strategy_func(&mut self, _work_count: c_int) -> () {}
41}
42
43pub struct RecordingPos;
44impl RecordingPos {
45    pub fn find_counter_id_by_session(
46        counter_reader: &AeronCountersReader,
47        session_id: i32,
48    ) -> i32 {
49        unsafe {
50            aeron_archive_recording_pos_find_counter_id_by_session_id(
51                counter_reader.get_inner(),
52                session_id,
53            )
54        }
55    }
56    pub fn find_counter_id_by_recording_id(
57        counter_reader: &AeronCountersReader,
58        recording_id: i64,
59    ) -> i32 {
60        unsafe {
61            aeron_archive_recording_pos_find_counter_id_by_recording_id(
62                counter_reader.get_inner(),
63                recording_id,
64            )
65        }
66    }
67
68    /// Return the recordingId embedded in the key of the given counter
69    /// if it is indeed a "recording position" counter. Otherwise return -1.
70    pub fn get_recording_id_block(
71        counters_reader: &AeronCountersReader,
72        counter_id: i32,
73        wait: Duration,
74    ) -> Result<i64, AeronCError> {
75        let mut result = Self::get_recording_id(counters_reader, counter_id);
76        let instant = Instant::now();
77
78        while result.is_err() && instant.elapsed() < wait {
79            result = Self::get_recording_id(counters_reader, counter_id);
80            #[cfg(debug_assertions)]
81            std::thread::sleep(Duration::from_millis(10));
82        }
83
84        return result;
85    }
86
87    /// Return the recordingId embedded in the key of the given counter
88    /// if it is indeed a "recording position" counter. Otherwise return -1.
89    pub fn get_recording_id(
90        counters_reader: &AeronCountersReader,
91        counter_id: i32,
92    ) -> Result<i64, AeronCError> {
93        /// The type id for an Aeron Archive recording position counter.
94        /// In Aeron Java, this is AeronCounters.ARCHIVE_RECORDING_POSITION_TYPE_ID (which is typically 100).
95        pub const RECORDING_POSITION_TYPE_ID: i32 = 100;
96
97        /// from Aeron Java code
98        pub const RECORD_ALLOCATED: i32 = 1;
99
100        /// A constant to mean "no valid recording ID".
101        pub const NULL_RECORDING_ID: i64 = -1;
102
103        if counter_id < 0 {
104            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
105        }
106
107        let state = counters_reader.counter_state(counter_id)?;
108        if state != RECORD_ALLOCATED {
109            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
110        }
111
112        let type_id = counters_reader.counter_type_id(counter_id)?;
113        if type_id != RECORDING_POSITION_TYPE_ID {
114            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
115        }
116
117        // Read the key area. For a RECORDING_POSITION_TYPE_ID counter:
118        //    - offset 0..8 => the i64 recording_id
119        //    - offset 8..12 => the session_id (int)
120        //    etc...
121        // only need the first 8 bytes to get the recordingId.
122        let recording_id = Cell::new(-1);
123        counters_reader.foreach_counter_once(|value, id, type_id, key, label| {
124            if id == counter_id && type_id == RECORDING_POSITION_TYPE_ID {
125                let mut val = [0u8; 8];
126                val.copy_from_slice(&key[0..8]);
127                let Ok(value) = i64::from_le_bytes(val).try_into();
128                recording_id.set(value);
129            }
130        });
131        let recording_id = recording_id.get();
132        if recording_id < 0 {
133            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
134        }
135
136        Ok(recording_id)
137    }
138}
139
140unsafe extern "C" fn default_encoded_credentials(
141    _clientd: *mut std::os::raw::c_void,
142) -> *mut aeron_archive_encoded_credentials_t {
143    // Allocate a zeroed instance of `aeron_archive_encoded_credentials_t`
144    let empty_credentials = Box::new(aeron_archive_encoded_credentials_t {
145        data: std::ptr::null(),
146        length: 0,
147    });
148    Box::into_raw(empty_credentials)
149}
150
151impl AeronArchive {
152    pub fn aeron(&self) -> Aeron {
153        self.get_archive_context().get_aeron()
154    }
155}
156
157impl AeronArchiveAsyncConnect {
158    #[inline]
159    /// recommend using this method instead of standard `new` as it will link the archive to aeron so if a drop occurs archive is dropped before aeron
160    pub fn new_with_aeron(ctx: &AeronArchiveContext, aeron: &Aeron) -> Result<Self, AeronCError> {
161        let resource_async = Self::new(ctx)?;
162        resource_async.inner.add_dependency(aeron.clone());
163        Ok(resource_async)
164    }
165}
166
167macro_rules! impl_archive_position_methods {
168    ($pub_type:ty) => {
169        impl $pub_type {
170            /// Retrieves the current active live archive position using the Aeron counters.
171            /// Returns an error if not found.
172            pub fn get_archive_position(&self) -> Result<i64, AeronCError> {
173                if let Some(aeron) = self.inner.get_dependency::<Aeron>() {
174                    let counter_reader = &aeron.counters_reader();
175                    self.get_archive_position_with(counter_reader)
176                } else {
177                    Err(AeronCError::from_code(-1))
178                }
179            }
180
181            /// Retrieves the current active live archive position using the provided counter reader.
182            /// Returns an error if not found.
183            pub fn get_archive_position_with(
184                &self,
185                counters: &AeronCountersReader,
186            ) -> Result<i64, AeronCError> {
187                let session_id = self.get_constants()?.session_id();
188                let counter_id = RecordingPos::find_counter_id_by_session(counters, session_id);
189                if counter_id < 0 {
190                    return Err(AeronCError::from_code(counter_id));
191                }
192                let position = counters.get_counter_value(counter_id);
193                if position < 0 {
194                    return Err(AeronCError::from_code(position as i32));
195                }
196                Ok(position)
197            }
198
199            /// Checks if the publication's current position is within a specified inclusive length
200            /// of the archive position.
201            pub fn is_archive_position_with(&self, length_inclusive: usize) -> bool {
202                let archive_position = self.get_archive_position().unwrap_or(-1);
203                if archive_position < 0 {
204                    return false;
205                }
206                self.position() - archive_position <= length_inclusive as i64
207            }
208        }
209    };
210}
211
212impl_archive_position_methods!(AeronPublication);
213impl_archive_position_methods!(AeronExclusivePublication);
214
215impl AeronArchiveContext {
216    // The method below sets no credentials supplier, which is essential for the operation
217    // of the Aeron Archive Context. The `set_credentials_supplier` must be set to prevent
218    // segmentation faults in the C bindings.
219    pub fn set_no_credentials_supplier(&self) -> Result<i32, AeronCError> {
220        self.set_credentials_supplier(
221            Some(default_encoded_credentials),
222            None,
223            None::<&Handler<AeronArchiveCredentialsFreeFuncLogger>>,
224        )
225    }
226
227    /// This method creates a new `AeronArchiveContext` with a no-op credentials supplier.
228    /// If you do not set a credentials supplier, it will segfault.
229    /// This method ensures that a non-functional credentials supplier is set to avoid the segfault.
230    pub fn new_with_no_credentials_supplier(
231        aeron: &Aeron,
232        request_control_channel: &str,
233        response_control_channel: &str,
234        recording_events_channel: &str,
235    ) -> Result<AeronArchiveContext, AeronCError> {
236        let context = Self::new()?;
237        context.set_no_credentials_supplier()?;
238        context.set_aeron(aeron)?;
239        context.set_control_request_channel(&request_control_channel.into_c_string())?;
240        context.set_control_response_channel(&response_control_channel.into_c_string())?;
241        context.set_recording_events_channel(&recording_events_channel.into_c_string())?;
242        // see https://github.com/gsrxyz/rusteron/issues/18
243        context.set_idle_strategy(Some(&Handler::leak(NoOpAeronIdleStrategyFunc)))?;
244        Ok(context)
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use log::{error, info};
252
253    use crate::testing::EmbeddedArchiveMediaDriverProcess;
254    use serial_test::serial;
255    use std::cell::Cell;
256    use std::error;
257    use std::error::Error;
258    use std::str::FromStr;
259    use std::sync::atomic::{AtomicBool, Ordering};
260    use std::sync::Arc;
261    use std::thread::{sleep, JoinHandle};
262    use std::time::{Duration, Instant};
263
264    #[derive(Default, Debug)]
265    struct ErrorCount {
266        error_count: usize,
267    }
268
269    impl AeronErrorHandlerCallback for ErrorCount {
270        fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
271            error!("Aeron error {}: {}", error_code, msg);
272            self.error_count += 1;
273        }
274    }
275
276    pub const ARCHIVE_CONTROL_REQUEST: &str = "aeron:udp?endpoint=localhost:8010";
277    pub const ARCHIVE_CONTROL_RESPONSE: &str = "aeron:udp?endpoint=localhost:8011";
278    pub const ARCHIVE_RECORDING_EVENTS: &str =
279        "aeron:udp?control-mode=dynamic|control=localhost:8012";
280
281    #[test]
282    fn test_uri_string_builder() -> Result<(), AeronCError> {
283        let builder = AeronUriStringBuilder::default();
284
285        builder.init_new()?;
286        builder
287            .media(Media::Udp)? // very important to set media else set_initial_position will give an error of -1
288            .mtu_length(1024 * 64)?
289            .set_initial_position(127424949617280, 1182294755, 65536)?;
290        let uri = builder.build(1024)?;
291        assert_eq!("aeron:udp?term-id=-1168322114|term-length=65536|mtu=65536|init-term-id=1182294755|term-offset=33408", uri);
292
293        builder.init_new()?;
294        let uri = builder
295            .media(Media::Udp)?
296            .control_mode(ControlMode::Dynamic)?
297            .reliable(false)?
298            .ttl(2)?
299            .endpoint("localhost:1235")?
300            .control("localhost:1234")?
301            .build(1024)?;
302        assert_eq!("aeron:udp?ttl=2|control-mode=dynamic|endpoint=localhost:1235|control=localhost:1234|reliable=false", uri);
303
304        let uri = AeronUriStringBuilder::from_str("aeron:udp?endpoint=localhost:8010")?
305            .ttl(5)?
306            .build(1024)?;
307
308        assert_eq!("aeron:udp?ttl=5|endpoint=localhost:8010", uri);
309
310        let uri = uri.parse::<AeronUriStringBuilder>()?.ttl(6)?.build(1024)?;
311
312        assert_eq!("aeron:udp?ttl=6|endpoint=localhost:8010", uri);
313
314        Ok(())
315    }
316
317    pub const STREAM_ID: i32 = 1033;
318    pub const MESSAGE_PREFIX: &str = "Message-Prefix-";
319    pub const CONTROL_ENDPOINT: &str = "localhost:23265";
320    pub const RECORDING_ENDPOINT: &str = "localhost:23266";
321    pub const LIVE_ENDPOINT: &str = "localhost:23267";
322    pub const REPLAY_ENDPOINT: &str = "localhost:0";
323    // pub const REPLAY_ENDPOINT: &str = "localhost:23268";
324
325    #[test]
326    #[serial]
327    fn test_simple_replay_merge() -> Result<(), AeronCError> {
328        let _ = env_logger::Builder::new()
329            .is_test(true)
330            .filter_level(log::LevelFilter::Info)
331            .try_init();
332
333        EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
334            .expect("failed to kill all java processes");
335
336        assert!(is_udp_port_available(23265));
337        assert!(is_udp_port_available(23266));
338        assert!(is_udp_port_available(23267));
339        assert!(is_udp_port_available(23268));
340        let id = Aeron::nano_clock();
341        let aeron_dir = format!("target/aeron/{}/shm", id);
342        let archive_dir = format!("target/aeron/{}/archive", id);
343
344        info!("starting archive media driver");
345        let media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
346            &aeron_dir,
347            &format!("{}/archive", aeron_dir),
348            ARCHIVE_CONTROL_REQUEST,
349            ARCHIVE_CONTROL_RESPONSE,
350            ARCHIVE_RECORDING_EVENTS,
351        )
352        .expect("Failed to start embedded media driver");
353
354        info!("connecting to archive");
355        let (archive, aeron) = media_driver
356            .archive_connect()
357            .expect("Could not connect to archive client");
358
359        let running = Arc::new(AtomicBool::new(true));
360
361        info!("connected to archive, adding publication");
362        assert!(!aeron.is_closed());
363
364        let (session_id, publisher_thread) =
365            reply_merge_publisher(&archive, aeron.clone(), running.clone())?;
366
367        {
368            let context = AeronContext::new()?;
369            context.set_dir(&media_driver.aeron_dir)?;
370            let error_handler = Handler::leak(ErrorCount::default());
371            context.set_error_handler(Some(&error_handler))?;
372            let aeron = Aeron::new(&context)?;
373            aeron.start()?;
374            let aeron_archive_context = archive.get_archive_context();
375            let aeron_archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
376                &aeron,
377                aeron_archive_context.get_control_request_channel(),
378                aeron_archive_context.get_control_response_channel(),
379                aeron_archive_context.get_recording_events_channel(),
380            )?;
381            aeron_archive_context.set_error_handler(Some(&error_handler))?;
382            let archive = AeronArchiveAsyncConnect::new_with_aeron(&aeron_archive_context, &aeron)?
383                .poll_blocking(Duration::from_secs(30))
384                .expect("failed to connect to archive");
385            replay_merge_subscription(&archive, aeron.clone(), session_id)?;
386        }
387
388        running.store(false, Ordering::Release);
389        publisher_thread.join().unwrap();
390
391        Ok(())
392    }
393
394    fn reply_merge_publisher(
395        archive: &AeronArchive,
396        aeron: Aeron,
397        running: Arc<AtomicBool>,
398    ) -> Result<(i32, JoinHandle<()>), AeronCError> {
399        let publication = aeron.add_publication(
400            // &format!("aeron:udp?control={CONTROL_ENDPOINT}|control-mode=dynamic|term-length=65536|fc=tagged,g:99901/1,t:5s"),
401            &format!("aeron:udp?control={CONTROL_ENDPOINT}|control-mode=dynamic|term-length=65536")
402                .into_c_string(),
403            STREAM_ID,
404            Duration::from_secs(5),
405        )?;
406
407        info!(
408            "publication {} [status={:?}]",
409            publication.channel(),
410            publication.channel_status()
411        );
412        assert_eq!(1, publication.channel_status());
413
414        let session_id = publication.session_id();
415        let recording_channel = format!(
416            // "aeron:udp?endpoint={RECORDING_ENDPOINT}|control={CONTROL_ENDPOINT}|session-id={session_id}|gtag=99901"
417            "aeron:udp?endpoint={RECORDING_ENDPOINT}|control={CONTROL_ENDPOINT}|session-id={session_id}"
418        );
419        info!("recording channel {}", recording_channel);
420        archive.start_recording(
421            &recording_channel.into_c_string(),
422            STREAM_ID,
423            SOURCE_LOCATION_REMOTE,
424            true,
425        )?;
426
427        info!("waiting for publisher to be connected");
428        while !publication.is_connected() {
429            thread::sleep(Duration::from_millis(100));
430        }
431        info!("publisher to be connected");
432        let counters_reader = aeron.counters_reader();
433        let mut caught_up_count = 0;
434        let publisher_thread = thread::spawn(move || {
435            let mut message_count = 0;
436
437            while running.load(Ordering::Acquire) {
438                let message = format!("{}{}", MESSAGE_PREFIX, message_count);
439                while publication.offer(
440                    message.as_bytes(),
441                    Handlers::no_reserved_value_supplier_handler(),
442                ) <= 0
443                {
444                    thread::sleep(Duration::from_millis(10));
445                }
446                message_count += 1;
447                if message_count % 10_000 == 0 {
448                    info!(
449                        "Published {} messages [position={}]",
450                        message_count,
451                        publication.position()
452                    );
453                }
454                // slow down publishing so can catch up
455                if message_count > 10_000 {
456                    // ensure archiver is caught up
457                    while !publication.is_archive_position_with(0) {
458                        thread::sleep(Duration::from_micros(300));
459                    }
460                    caught_up_count += 1;
461                }
462            }
463            assert!(caught_up_count > 0);
464            info!("Publisher thread terminated");
465        });
466        Ok((session_id, publisher_thread))
467    }
468
469    fn replay_merge_subscription(
470        archive: &AeronArchive,
471        aeron: Aeron,
472        session_id: i32,
473    ) -> Result<(), AeronCError> {
474        // let replay_channel = format!("aeron:udp?control-mode=manual|session-id={session_id}");
475        let replay_channel = format!("aeron:udp?session-id={session_id}").into_c_string();
476        info!("replay channel {:?}", replay_channel);
477
478        let replay_destination = format!("aeron:udp?endpoint={REPLAY_ENDPOINT}").into_c_string();
479        info!("replay destination {:?}", replay_destination);
480
481        let live_destination =
482            format!("aeron:udp?endpoint={LIVE_ENDPOINT}|control={CONTROL_ENDPOINT}")
483                .into_c_string();
484        info!("live destination {:?}", live_destination);
485
486        let counters_reader = aeron.counters_reader();
487        let mut counter_id = -1;
488
489        while counter_id < 0 {
490            counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
491        }
492        info!(
493            "counter id {} {:?}",
494            counter_id,
495            counters_reader.get_counter_label(counter_id, 1024)
496        );
497        info!(
498            "counter id {} position={:?}",
499            counter_id,
500            counters_reader.get_counter_value(counter_id)
501        );
502
503        // let recording_id = Cell::new(-1);
504        // let start_position = Cell::new(-1);
505
506        // let mut count = 0;
507        // assert!(
508        //     archive.list_recordings_once(&mut count, 0, 1000, |descriptor| {
509        //         info!("Recording descriptor: {:?}", descriptor);
510        //         recording_id.set(descriptor.recording_id);
511        //         start_position.set(descriptor.start_position);
512        //         assert_eq!(descriptor.session_id, session_id);
513        //         assert_eq!(descriptor.stream_id, STREAM_ID);
514        //     })? >= 0
515        // );
516        // assert!(count > 0);
517        // assert!(recording_id.get() >= 0);
518
519        // let record_id = RecordingPos::get_recording_id(&aeron.counters_reader(), counter_id)?;
520        // assert_eq!(recording_id.get(), record_id);
521        //
522        // let recording_id = recording_id.get();
523        // let start_position = start_position.get();
524        let start_position = 0;
525        let recording_id = RecordingPos::get_recording_id_block(
526            &aeron.counters_reader(),
527            counter_id,
528            Duration::from_secs(5),
529        )?;
530
531        let subscribe_channel =
532            format!("aeron:udp?control-mode=manual|session-id={session_id}").into_c_string();
533        info!("subscribe channel {:?}", subscribe_channel);
534        let subscription = aeron.add_subscription(
535            &subscribe_channel,
536            STREAM_ID,
537            Handlers::no_available_image_handler(),
538            Handlers::no_unavailable_image_handler(),
539            Duration::from_secs(5),
540        )?;
541
542        let replay_merge = AeronArchiveReplayMerge::new(
543            &subscription,
544            &archive,
545            &replay_channel,
546            &replay_destination,
547            &live_destination,
548            recording_id,
549            start_position,
550            Aeron::epoch_clock(),
551            10_000,
552        )?;
553
554        info!(
555            "ReplayMerge initialization: recordingId={}, startPosition={}, subscriptionChannel={:?}, replayChannel={:?}, replayDestination={:?}, liveDestination={:?}",
556            recording_id,
557            start_position,
558            subscribe_channel,
559            &replay_channel,
560            &replay_destination,
561            &live_destination
562        );
563
564        // media_driver
565        //     .run_aeron_stats()
566        //     .expect("Failed to run aeron stats");
567
568        // info!("Waiting for subscription to connect...");
569        // while !subscription.is_connected() {
570        //     thread::sleep(Duration::from_millis(100));
571        // }
572        // info!("Subscription connected");
573
574        info!(
575            "about to start_replay [maxRecordPosition={:?}]",
576            archive.get_max_recorded_position(recording_id)
577        );
578
579        let mut reply_count = 0;
580        while !replay_merge.is_merged() {
581            assert!(!replay_merge.has_failed());
582            if replay_merge.poll_once(
583                |buffer, _header| {
584                    reply_count += 1;
585                    if reply_count % 10_000 == 0 {
586                        info!(
587                            "replay-merge [count={}, isMerged={}, isLive={}]",
588                            reply_count,
589                            replay_merge.is_merged(),
590                            replay_merge.is_live_added()
591                        );
592                    }
593                },
594                100,
595            )? == 0
596            {
597                let err = archive.poll_for_error_response_as_string(4096)?;
598                if !err.is_empty() {
599                    panic!("{}", err);
600                }
601                if Aeron::errmsg().len() > 0 && "no error" != Aeron::errmsg() {
602                    panic!("{}", Aeron::errmsg());
603                }
604                archive.poll_for_recording_signals()?;
605                thread::sleep(Duration::from_millis(100));
606            }
607        }
608        assert!(!replay_merge.has_failed());
609        assert!(replay_merge.is_live_added());
610        assert!(reply_count > 10_000);
611        Ok(())
612    }
613
614    #[test]
615    fn version_check() {
616        let major = unsafe { crate::aeron_version_major() };
617        let minor = unsafe { crate::aeron_version_minor() };
618        let patch = unsafe { crate::aeron_version_patch() };
619
620        let aeron_version = format!("{}.{}.{}", major, minor, patch);
621
622        let cargo_version = "1.48.5";
623        assert_eq!(aeron_version, cargo_version);
624    }
625
626    use std::thread;
627
628    pub fn start_aeron_archive() -> Result<
629        (
630            Aeron,
631            AeronArchiveContext,
632            EmbeddedArchiveMediaDriverProcess,
633        ),
634        Box<dyn Error>,
635    > {
636        let id = Aeron::nano_clock();
637        let aeron_dir = format!("target/aeron/{}/shm", id);
638        let archive_dir = format!("target/aeron/{}/archive", id);
639
640        let request_port = find_unused_udp_port(8000).expect("Could not find port");
641        let response_port = find_unused_udp_port(request_port + 1).expect("Could not find port");
642        let recording_event_port =
643            find_unused_udp_port(response_port + 1).expect("Could not find port");
644        let request_control_channel = &format!("aeron:udp?endpoint=localhost:{}", request_port);
645        let response_control_channel = &format!("aeron:udp?endpoint=localhost:{}", response_port);
646        let recording_events_channel =
647            &format!("aeron:udp?endpoint=localhost:{}", recording_event_port);
648        assert_ne!(request_control_channel, response_control_channel);
649
650        let archive_media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
651            &aeron_dir,
652            &archive_dir,
653            request_control_channel,
654            response_control_channel,
655            recording_events_channel,
656        )
657        .expect("Failed to start Java process");
658
659        let aeron_context = AeronContext::new()?;
660        aeron_context.set_dir(&aeron_dir.into_c_string())?;
661        aeron_context.set_client_name(&"test".into_c_string())?;
662        aeron_context.set_publication_error_frame_handler(Some(&Handler::leak(
663            AeronPublicationErrorFrameHandlerLogger,
664        )))?;
665        let error_handler = Handler::leak(ErrorCount::default());
666        aeron_context.set_error_handler(Some(&error_handler))?;
667        let aeron = Aeron::new(&aeron_context)?;
668        aeron.start()?;
669
670        let archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
671            &aeron,
672            request_control_channel,
673            response_control_channel,
674            recording_events_channel,
675        )?;
676        archive_context.set_error_handler(Some(&error_handler))?;
677        Ok((aeron, archive_context, archive_media_driver))
678    }
679
680    #[test]
681    #[serial]
682    pub fn test_aeron_archive() -> Result<(), Box<dyn error::Error>> {
683        let _ = env_logger::Builder::new()
684            .is_test(true)
685            .filter_level(log::LevelFilter::Info)
686            .try_init();
687        EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
688            .expect("failed to kill all java processes");
689
690        let (aeron, archive_context, media_driver) = start_aeron_archive()?;
691
692        assert!(!aeron.is_closed());
693
694        info!("connected to aeron");
695
696        let archive_connector =
697            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
698        let archive = archive_connector
699            .poll_blocking(Duration::from_secs(30))
700            .expect("failed to connect to aeron archive media driver");
701
702        assert!(archive.get_archive_id() > 0);
703
704        let channel = AERON_IPC_STREAM;
705        let stream_id = 10;
706
707        let subscription_id =
708            archive.start_recording(channel, stream_id, SOURCE_LOCATION_LOCAL, true)?;
709
710        assert!(subscription_id >= 0);
711        info!("subscription id {}", subscription_id);
712
713        let publication = aeron
714            .async_add_exclusive_publication(channel, stream_id)?
715            .poll_blocking(Duration::from_secs(5))?;
716
717        for i in 0..11 {
718            while publication.offer(
719                "123456".as_bytes(),
720                Handlers::no_reserved_value_supplier_handler(),
721            ) <= 0
722            {
723                sleep(Duration::from_millis(50));
724                archive.poll_for_recording_signals()?;
725                let err = archive.poll_for_error_response_as_string(4096)?;
726                if !err.is_empty() {
727                    panic!("{}", err);
728                }
729                archive.idle();
730            }
731            info!("sent message {i} [test_aeron_archive]");
732        }
733
734        archive.idle();
735        let session_id = publication.get_constants()?.session_id;
736        info!("publication session id {}", session_id);
737        // since this is single threaded need to make sure it did write to archiver, usually not required in multi-proccess app
738        let stop_position = publication.position();
739        info!(
740            "publication stop position {} [publication={:?}]",
741            stop_position,
742            publication.get_constants()
743        );
744        let counters_reader = aeron.counters_reader();
745        info!("counters reader ready {:?}", counters_reader);
746
747        let mut counter_id = -1;
748
749        let start = Instant::now();
750        while counter_id <= 0 && start.elapsed() < Duration::from_secs(5) {
751            counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
752            info!("counter id {}", counter_id);
753        }
754
755        assert!(counter_id >= 0);
756
757        info!("counter id {counter_id}, session id {session_id}");
758        while counters_reader.get_counter_value(counter_id) < stop_position {
759            info!(
760                "current archive publication stop position {}",
761                counters_reader.get_counter_value(counter_id)
762            );
763            sleep(Duration::from_millis(50));
764        }
765        info!(
766            "found archive publication stop position {}",
767            counters_reader.get_counter_value(counter_id)
768        );
769
770        archive.stop_recording_channel_and_stream(channel, stream_id)?;
771        drop(publication);
772
773        info!("list recordings");
774        let found_recording_id = Cell::new(-1);
775        let start_pos = Cell::new(-1);
776        let end_pos = Cell::new(-1);
777        let start = Instant::now();
778        while start.elapsed() < Duration::from_secs(5) && found_recording_id.get() == -1 {
779            let mut count = 0;
780            archive.list_recordings_for_uri_once(
781                &mut count,
782                0,
783                i32::MAX,
784                channel,
785                stream_id,
786                |d: AeronArchiveRecordingDescriptor| {
787                    assert_eq!(d.stream_id, stream_id);
788                    info!("found recording {:#?}", d);
789                    info!(
790                        "strippedChannel={}, originalChannel={}",
791                        d.stripped_channel(),
792                        d.original_channel()
793                    );
794                    if d.stop_position > d.start_position && d.stop_position > 0 {
795                        found_recording_id.set(d.recording_id);
796                        start_pos.set(d.start_position);
797                        end_pos.set(d.stop_position);
798                    }
799
800                    // verify clone_struct works
801                    let copy = d.clone_struct();
802                    assert_eq!(copy.deref(), d.deref());
803                    assert_eq!(copy.recording_id, d.recording_id);
804                    assert_eq!(copy.control_session_id, d.control_session_id);
805                    assert_eq!(copy.mtu_length, d.mtu_length);
806                    assert_eq!(copy.source_identity_length, d.source_identity_length);
807                },
808            )?;
809            archive.poll_for_recording_signals()?;
810            let err = archive.poll_for_error_response_as_string(4096)?;
811            if !err.is_empty() {
812                panic!("{}", err);
813            }
814        }
815        assert!(start.elapsed() < Duration::from_secs(5));
816        info!("start replay");
817        let params = AeronArchiveReplayParams::new(
818            0,
819            i32::MAX,
820            start_pos.get(),
821            end_pos.get() - start_pos.get(),
822            0,
823            0,
824        )?;
825        info!("replay params {:#?}", params);
826        let replay_stream_id = 45;
827        let replay_session_id =
828            archive.start_replay(found_recording_id.get(), channel, replay_stream_id, &params)?;
829        let session_id = replay_session_id as i32;
830
831        info!("replay session id {}", replay_session_id);
832        info!("session id {}", session_id);
833        let channel_replay =
834            format!("{}?session-id={}", channel.to_str().unwrap(), session_id).into_c_string();
835        info!("archive id: {}", archive.get_archive_id());
836
837        info!("add subscription {:?}", channel_replay);
838        let subscription = aeron
839            .async_add_subscription(
840                &channel_replay,
841                replay_stream_id,
842                Some(&Handler::leak(AeronAvailableImageLogger)),
843                Some(&Handler::leak(AeronUnavailableImageLogger)),
844            )?
845            .poll_blocking(Duration::from_secs(10))?;
846
847        #[derive(Default)]
848        struct FragmentHandler {
849            count: Cell<usize>,
850        }
851
852        impl AeronFragmentHandlerCallback for FragmentHandler {
853            fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], _header: AeronHeader) {
854                assert_eq!(buffer, "123456".as_bytes());
855
856                // Update count (using Cell for interior mutability)
857                self.count.set(self.count.get() + 1);
858            }
859        }
860
861        let poll = Handler::leak(FragmentHandler::default());
862
863        let start = Instant::now();
864        while start.elapsed() < Duration::from_secs(10) && subscription.poll(Some(&poll), 100)? <= 0
865        {
866            let err = archive.poll_for_error_response_as_string(4096)?;
867            if !err.is_empty() {
868                panic!("{}", err);
869            }
870        }
871        assert!(
872            start.elapsed() < Duration::from_secs(10),
873            "messages not received {:?}",
874            poll.count
875        );
876        info!("aeron {:?}", aeron);
877        info!("ctx {:?}", archive_context);
878        assert_eq!(11, poll.count.get());
879        Ok(())
880    }
881
882    #[test]
883    #[serial]
884    fn test_invalid_recording_channel() -> Result<(), Box<dyn Error>> {
885        let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
886        let archive_connector =
887            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
888        let archive = archive_connector
889            .poll_blocking(Duration::from_secs(30))
890            .expect("failed to connect to archive");
891
892        let invalid_channel = "invalid:channel".into_c_string();
893        let result =
894            archive.start_recording(&invalid_channel, STREAM_ID, SOURCE_LOCATION_LOCAL, true);
895        assert!(
896            result.is_err(),
897            "Expected error when starting recording with an invalid channel"
898        );
899        Ok(())
900    }
901
902    #[test]
903    #[serial]
904    fn test_stop_recording_on_nonexistent_channel() -> Result<(), Box<dyn Error>> {
905        let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
906        let archive_connector =
907            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
908        let archive = archive_connector
909            .poll_blocking(Duration::from_secs(30))
910            .expect("failed to connect to archive");
911
912        let nonexistent_channel = &"aeron:udp?endpoint=localhost:9999".into_c_string();
913        let result = archive.stop_recording_channel_and_stream(nonexistent_channel, STREAM_ID);
914        assert!(
915            result.is_err(),
916            "Expected error when stopping recording on a non-existent channel"
917        );
918        Ok(())
919    }
920
921    #[test]
922    #[serial]
923    fn test_replay_with_invalid_recording_id() -> Result<(), Box<dyn Error>> {
924        let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
925        let archive_connector =
926            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
927        let archive = archive_connector
928            .poll_blocking(Duration::from_secs(30))
929            .expect("failed to connect to archive");
930
931        let invalid_recording_id = -999;
932        let params = AeronArchiveReplayParams::new(0, i32::MAX, 0, 100, 0, 0)?;
933        let result = archive.start_replay(
934            invalid_recording_id,
935            &"aeron:udp?endpoint=localhost:8888".into_c_string(),
936            STREAM_ID,
937            &params,
938        );
939        assert!(
940            result.is_err(),
941            "Expected error when starting replay with an invalid recording id"
942        );
943        Ok(())
944    }
945
946    #[test]
947    #[serial]
948    fn test_archive_reconnect_after_close() -> Result<(), Box<dyn std::error::Error>> {
949        let (aeron, archive_context, media_driver) = start_aeron_archive()?;
950        let archive_connector =
951            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
952        let archive = archive_connector
953            .poll_blocking(Duration::from_secs(30))
954            .expect("failed to connect to archive");
955
956        drop(archive);
957
958        let archive_connector = AeronArchiveAsyncConnect::new_with_aeron(&archive_context, &aeron)?;
959        let new_archive = archive_connector
960            .poll_blocking(Duration::from_secs(30))
961            .expect("failed to reconnect to archive");
962        assert!(
963            new_archive.get_archive_id() > 0,
964            "Reconnected archive should have a valid archive id"
965        );
966
967        drop(media_driver);
968        Ok(())
969    }
970}