1use crate::AeronErrorType::Unknown;
2#[cfg(feature = "backtrace")]
3use std::backtrace::Backtrace;
4use std::cell::UnsafeCell;
5use std::fmt::Formatter;
6use std::mem::MaybeUninit;
7use std::ops::{Deref, DerefMut};
8
9pub enum CResource<T> {
10 OwnedOnHeap(std::rc::Rc<ManagedCResource<T>>),
11 OwnedOnStack(std::mem::MaybeUninit<T>),
13 Borrowed(*mut T),
14}
15
16impl<T: Clone> Clone for CResource<T> {
17 fn clone(&self) -> Self {
18 unsafe {
19 match self {
20 CResource::OwnedOnHeap(r) => CResource::OwnedOnHeap(r.clone()),
21 CResource::OwnedOnStack(r) => {
22 CResource::OwnedOnStack(MaybeUninit::new(r.assume_init_ref().clone()))
23 }
24 CResource::Borrowed(r) => CResource::Borrowed(r.clone()),
25 }
26 }
27 }
28}
29
30impl<T> CResource<T> {
31 #[inline]
32 pub fn get(&self) -> *mut T {
33 match self {
34 CResource::OwnedOnHeap(r) => r.get(),
35 CResource::OwnedOnStack(r) => r.as_ptr() as *mut T,
36 CResource::Borrowed(r) => *r,
37 }
38 }
39
40 #[inline]
41 pub fn add_dependency<D: std::any::Any>(&self, dep: D) {
43 match self {
44 CResource::OwnedOnHeap(r) => r.add_dependency(dep),
45 CResource::OwnedOnStack(_) | CResource::Borrowed(_) => {
46 unreachable!("only owned on heap")
47 }
48 }
49 }
50 #[inline]
51 pub fn get_dependency<V: Clone + 'static>(&self) -> Option<V> {
52 match self {
53 CResource::OwnedOnHeap(r) => r.get_dependency(),
54 CResource::OwnedOnStack(_) | CResource::Borrowed(_) => None,
55 }
56 }
57
58 #[inline]
59 pub fn as_owned(&self) -> Option<&std::rc::Rc<ManagedCResource<T>>> {
60 match self {
61 CResource::OwnedOnHeap(r) => Some(r),
62 CResource::OwnedOnStack(_) | CResource::Borrowed(_) => None,
63 }
64 }
65}
66
67impl<T> std::fmt::Debug for CResource<T> {
68 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69 let name = std::any::type_name::<T>();
70
71 match self {
72 CResource::OwnedOnHeap(r) => {
73 write!(f, "{name} heap({:?})", r)
74 }
75 CResource::OwnedOnStack(r) => {
76 write!(f, "{name} stack({:?})", *r)
77 }
78 CResource::Borrowed(r) => {
79 write!(f, "{name} borrowed ({:?})", r)
80 }
81 }
82 }
83}
84
85#[allow(dead_code)]
90pub struct ManagedCResource<T> {
91 resource: *mut T,
92 cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
93 cleanup_struct: bool,
94 close_already_called: std::cell::Cell<bool>,
96 check_for_is_closed: Option<fn(*mut T) -> bool>,
98 auto_close: std::cell::Cell<bool>,
100 resource_released: std::cell::Cell<bool>,
102 dependencies: UnsafeCell<Vec<std::rc::Rc<dyn std::any::Any>>>,
107}
108
109impl<T> std::fmt::Debug for ManagedCResource<T> {
110 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111 let mut debug_struct = f.debug_struct("ManagedCResource");
112
113 if !self.close_already_called.get()
114 && !self.resource.is_null()
115 && !self
116 .check_for_is_closed
117 .as_ref()
118 .map_or(false, |f| f(self.resource))
119 {
120 debug_struct.field("resource", &self.resource);
121 }
122
123 debug_struct
124 .field("type", &std::any::type_name::<T>())
125 .finish()
126 }
127}
128
129impl<T> ManagedCResource<T> {
130 pub fn new(
137 init: impl FnOnce(*mut *mut T) -> i32,
138 cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
139 cleanup_struct: bool,
140 check_for_is_closed: Option<fn(*mut T) -> bool>,
141 ) -> Result<Self, AeronCError> {
142 let resource = Self::initialise(init)?;
143
144 let result = Self {
145 resource,
146 cleanup,
147 cleanup_struct,
148 close_already_called: std::cell::Cell::new(false),
149 check_for_is_closed,
150 auto_close: std::cell::Cell::new(false),
151 resource_released: std::cell::Cell::new(false),
152 dependencies: UnsafeCell::new(vec![]),
153 };
154 #[cfg(feature = "extra-logging")]
155 log::info!("created c resource: {:?}", result);
156 Ok(result)
157 }
158
159 pub fn initialise(
160 init: impl FnOnce(*mut *mut T) -> i32 + Sized,
161 ) -> Result<*mut T, AeronCError> {
162 let mut resource: *mut T = std::ptr::null_mut();
163 let result = init(&mut resource);
164 if result < 0 || resource.is_null() {
165 return Err(AeronCError::from_code(result));
166 }
167 Ok(resource)
168 }
169
170 pub fn is_closed_already_called(&self) -> bool {
171 self.close_already_called.get()
172 || self.resource.is_null()
173 || self
174 .check_for_is_closed
175 .as_ref()
176 .map_or(false, |f| f(self.resource))
177 }
178
179 #[inline(always)]
181 pub fn get(&self) -> *mut T {
182 self.resource
183 }
184
185 #[inline(always)]
186 pub fn get_mut(&self) -> &mut T {
187 unsafe { &mut *self.resource }
188 }
189
190 #[inline]
191 pub fn add_dependency<D: std::any::Any>(&self, dep: D) {
193 if let Some(dep) =
194 (&dep as &dyn std::any::Any).downcast_ref::<std::rc::Rc<dyn std::any::Any>>()
195 {
196 unsafe {
197 (*self.dependencies.get()).push(dep.clone());
198 }
199 } else {
200 unsafe {
201 (*self.dependencies.get()).push(std::rc::Rc::new(dep));
202 }
203 }
204 }
205
206 #[inline]
207 pub fn get_dependency<V: Clone + 'static>(&self) -> Option<V> {
208 unsafe {
209 (*self.dependencies.get())
210 .iter()
211 .filter_map(|x| x.as_ref().downcast_ref::<V>().cloned())
212 .next()
213 }
214 }
215
216 #[inline]
217 pub fn is_resource_released(&self) -> bool {
218 self.resource_released.get()
219 }
220
221 #[inline]
222 pub fn mark_resource_released(&self) {
223 self.resource_released.set(true);
224 }
225
226 pub fn close(&mut self) -> Result<(), AeronCError> {
230 if self.close_already_called.get() {
231 return Ok(());
232 }
233 self.close_already_called.set(true);
234
235 let already_closed = self
236 .check_for_is_closed
237 .as_ref()
238 .map_or(false, |f| f(self.resource));
239
240 if let Some(mut cleanup) = self.cleanup.take() {
241 if !self.resource.is_null() {
242 if !already_closed {
243 let result = cleanup(&mut self.resource);
244 if result < 0 {
245 return Err(AeronCError::from_code(result));
246 }
247 }
248 self.resource = std::ptr::null_mut();
249 }
250 }
251
252 Ok(())
253 }
254}
255
256impl<T> Drop for ManagedCResource<T> {
257 fn drop(&mut self) {
258 if !self.resource.is_null() {
259 let already_closed = self.close_already_called.get()
260 || self
261 .check_for_is_closed
262 .as_ref()
263 .map_or(false, |f| f(self.resource));
264
265 let resource = if already_closed {
266 self.resource
267 } else {
268 self.resource.clone()
269 };
270
271 if !already_closed {
272 #[cfg(feature = "extra-logging")]
274 log::info!("closing c resource: {:?}", self);
275 let _ = self.close(); }
277 self.close_already_called.set(true);
278
279 if self.cleanup_struct {
280 #[cfg(feature = "extra-logging")]
281 log::info!("closing rust struct resource: {:?}", resource);
282 unsafe {
283 let _ = Box::from_raw(resource);
284 }
285 }
286 }
287 }
288}
289
290#[derive(Debug, PartialOrd, Eq, PartialEq, Clone)]
291pub enum AeronErrorType {
292 GenericError,
293 ClientErrorDriverTimeout,
294 ClientErrorClientTimeout,
295 ClientErrorConductorServiceTimeout,
296 ClientErrorBufferFull,
297 PublicationBackPressured,
298 PublicationAdminAction,
299 PublicationClosed,
300 PublicationMaxPositionExceeded,
301 PublicationError,
302 TimedOut,
303 Unknown(i32),
304}
305
306impl From<AeronErrorType> for AeronCError {
307 fn from(value: AeronErrorType) -> Self {
308 AeronCError::from_code(value.code())
309 }
310}
311
312impl AeronErrorType {
313 pub fn code(&self) -> i32 {
314 match self {
315 AeronErrorType::GenericError => -1,
316 AeronErrorType::ClientErrorDriverTimeout => -1000,
317 AeronErrorType::ClientErrorClientTimeout => -1001,
318 AeronErrorType::ClientErrorConductorServiceTimeout => -1002,
319 AeronErrorType::ClientErrorBufferFull => -1003,
320 AeronErrorType::PublicationBackPressured => -2,
321 AeronErrorType::PublicationAdminAction => -3,
322 AeronErrorType::PublicationClosed => -4,
323 AeronErrorType::PublicationMaxPositionExceeded => -5,
324 AeronErrorType::PublicationError => -6,
325 AeronErrorType::TimedOut => -234324,
326 AeronErrorType::Unknown(code) => *code,
327 }
328 }
329
330 pub fn is_back_pressured(&self) -> bool {
331 self == &AeronErrorType::PublicationBackPressured
332 }
333
334 pub fn is_admin_action(&self) -> bool {
335 self == &AeronErrorType::PublicationAdminAction
336 }
337
338 pub fn is_back_pressured_or_admin_action(&self) -> bool {
339 self.is_back_pressured() || self.is_admin_action()
340 }
341
342 pub fn from_code(code: i32) -> Self {
343 match code {
344 -1 => AeronErrorType::GenericError,
345 -1000 => AeronErrorType::ClientErrorDriverTimeout,
346 -1001 => AeronErrorType::ClientErrorClientTimeout,
347 -1002 => AeronErrorType::ClientErrorConductorServiceTimeout,
348 -1003 => AeronErrorType::ClientErrorBufferFull,
349 -2 => AeronErrorType::PublicationBackPressured,
350 -3 => AeronErrorType::PublicationAdminAction,
351 -4 => AeronErrorType::PublicationClosed,
352 -5 => AeronErrorType::PublicationMaxPositionExceeded,
353 -6 => AeronErrorType::PublicationError,
354 -234324 => AeronErrorType::TimedOut,
355 _ => Unknown(code),
356 }
357 }
358
359 pub fn to_string(&self) -> &'static str {
360 match self {
361 AeronErrorType::GenericError => "Generic Error",
362 AeronErrorType::ClientErrorDriverTimeout => "Client Error Driver Timeout",
363 AeronErrorType::ClientErrorClientTimeout => "Client Error Client Timeout",
364 AeronErrorType::ClientErrorConductorServiceTimeout => {
365 "Client Error Conductor Service Timeout"
366 }
367 AeronErrorType::ClientErrorBufferFull => "Client Error Buffer Full",
368 AeronErrorType::PublicationBackPressured => "Publication Back Pressured",
369 AeronErrorType::PublicationAdminAction => "Publication Admin Action",
370 AeronErrorType::PublicationClosed => "Publication Closed",
371 AeronErrorType::PublicationMaxPositionExceeded => "Publication Max Position Exceeded",
372 AeronErrorType::PublicationError => "Publication Error",
373 AeronErrorType::TimedOut => "Timed Out",
374 AeronErrorType::Unknown(_) => "Unknown Error",
375 }
376 }
377}
378
379#[derive(Eq, PartialEq, Clone)]
384pub struct AeronCError {
385 pub code: i32,
386}
387
388impl AeronCError {
389 pub fn from_code(code: i32) -> Self {
393 #[cfg(feature = "backtrace")]
394 {
395 if code < 0 {
396 let backtrace = Backtrace::capture();
397 let backtrace = format!("{:?}", backtrace);
398
399 let re =
400 regex::Regex::new(r#"fn: "([^"]+)", file: "([^"]+)", line: (\d+)"#).unwrap();
401 let mut lines = String::new();
402 re.captures_iter(&backtrace).for_each(|cap| {
403 let function = &cap[1];
404 let mut file = cap[2].to_string();
405 let line = &cap[3];
406 if file.starts_with("./") {
407 file = format!("{}/{}", env!("CARGO_MANIFEST_DIR"), &file[2..]);
408 } else if file.starts_with("/rustc/") {
409 file = file.split("/").last().unwrap().to_string();
410 }
411 lines.push_str(&format!(" {file}:{line} in {function}\n"));
413 });
414
415 log::error!(
416 "Aeron C error code: {}, kind: '{:?}'\n{}",
417 code,
418 AeronErrorType::from_code(code),
419 lines
420 );
421 }
422 }
423 AeronCError { code }
424 }
425
426 pub fn kind(&self) -> AeronErrorType {
427 AeronErrorType::from_code(self.code)
428 }
429
430 pub fn is_back_pressured(&self) -> bool {
431 self.kind().is_back_pressured()
432 }
433
434 pub fn is_admin_action(&self) -> bool {
435 self.kind().is_admin_action()
436 }
437
438 pub fn is_back_pressured_or_admin_action(&self) -> bool {
439 self.kind().is_back_pressured_or_admin_action()
440 }
441}
442
443pub struct Handler<T> {
462 raw_ptr: *mut T,
463 should_drop: bool,
464}
465
466unsafe impl<T> Send for Handler<T> {}
467unsafe impl<T> Sync for Handler<T> {}
468
469pub struct Handlers;
471
472impl<T> Handler<T> {
473 pub fn leak(handler: T) -> Self {
474 let raw_ptr = Box::into_raw(Box::new(handler)) as *mut _;
475 #[cfg(feature = "extra-logging")]
476 log::info!("creating handler {:?}", raw_ptr);
477 Self {
478 raw_ptr,
479 should_drop: true,
480 }
481 }
482
483 pub fn is_none(&self) -> bool {
484 self.raw_ptr.is_null()
485 }
486
487 pub fn as_raw(&self) -> *mut std::os::raw::c_void {
488 self.raw_ptr as *mut std::os::raw::c_void
489 }
490
491 pub fn release(&mut self) {
492 if self.should_drop && !self.raw_ptr.is_null() {
493 unsafe {
494 #[cfg(feature = "extra-logging")]
495 log::info!("dropping handler {:?}", self.raw_ptr);
496 let _ = Box::from_raw(self.raw_ptr as *mut T);
497 self.should_drop = false;
498 }
499 }
500 }
501
502 pub unsafe fn new(raw_ptr: *mut T, should_drop: bool) -> Self {
503 Self {
504 raw_ptr,
505 should_drop,
506 }
507 }
508}
509
510impl<T> Deref for Handler<T> {
511 type Target = T;
512
513 fn deref(&self) -> &Self::Target {
514 unsafe { &*self.raw_ptr as &T }
515 }
516}
517
518impl<T> DerefMut for Handler<T> {
519 fn deref_mut(&mut self) -> &mut Self::Target {
520 unsafe { &mut *self.raw_ptr as &mut T }
521 }
522}
523
524pub fn find_unused_udp_port(start_port: u16) -> Option<u16> {
525 let end_port = u16::MAX;
526
527 for port in start_port..=end_port {
528 if is_udp_port_available(port) {
529 return Some(port);
530 }
531 }
532
533 None
534}
535
536pub fn is_udp_port_available(port: u16) -> bool {
537 std::net::UdpSocket::bind(("127.0.0.1", port)).is_ok()
538}
539
540pub struct ChannelUri {}
542
543impl ChannelUri {
544 pub const AERON_SCHEME: &'static str = "aeron";
545 pub const SPY_QUALIFIER: &'static str = "aeron-spy";
546 pub const MAX_URI_LENGTH: usize = 4095;
547}
548
549pub const DRIVER_TIMEOUT_MS_DEFAULT: u64 = 10_000;
550pub const AERON_DIR_PROP_NAME: &str = "aeron.dir";
551pub const AERON_IPC_MEDIA: &str = "aeron:ipc";
552pub const AERON_UDP_MEDIA: &str = "aeron:udp";
553pub const SPY_PREFIX: &str = "aeron-spy:";
554pub const TAG_PREFIX: &str = "tag:";
555
556#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
558pub enum Media {
559 Ipc,
560 Udp,
561}
562
563impl Media {
564 pub fn as_str(&self) -> &'static str {
565 match self {
566 Media::Ipc => "ipc",
567 Media::Udp => "udp",
568 }
569 }
570}
571
572#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
574pub enum ControlMode {
575 Manual,
576 Dynamic,
577 Response,
579}
580
581impl ControlMode {
582 pub fn as_str(&self) -> &'static str {
583 match self {
584 ControlMode::Manual => "manual",
585 ControlMode::Dynamic => "dynamic",
586 ControlMode::Response => "response",
587 }
588 }
589}
590
591#[cfg(test)]
592#[allow(dead_code)]
593pub(crate) mod test_alloc {
594 use std::alloc::{GlobalAlloc, Layout, System};
595 use std::sync::atomic::{AtomicIsize, Ordering};
596
597 pub struct CountingAllocator {
602 allocs: AtomicIsize,
603 }
604
605 impl CountingAllocator {
606 pub const fn new() -> Self {
607 Self {
608 allocs: AtomicIsize::new(0),
609 }
610 }
611 fn current(&self) -> isize {
613 self.allocs.load(Ordering::SeqCst)
614 }
615 }
616
617 unsafe impl GlobalAlloc for CountingAllocator {
618 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
619 self.allocs.fetch_add(1, Ordering::SeqCst);
620 System.alloc(layout)
621 }
622 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
623 self.allocs.fetch_sub(1, Ordering::SeqCst);
624 System.dealloc(ptr, layout)
625 }
626 }
627
628 #[global_allocator]
629 static GLOBAL: CountingAllocator = CountingAllocator::new();
630
631 pub fn current_allocs() -> isize {
633 GLOBAL.current()
634 }
635}
636
637pub trait IntoCString {
638 fn into_c_string(self) -> std::ffi::CString;
639}
640
641impl IntoCString for std::ffi::CString {
642 fn into_c_string(self) -> std::ffi::CString {
643 self
644 }
645}
646
647impl IntoCString for &str {
648 fn into_c_string(self) -> std::ffi::CString {
649 #[cfg(feature = "extra-logging")]
650 log::info!("created c string on heap: {:?}", self);
651
652 std::ffi::CString::new(self).expect("failed to create CString")
653 }
654}
655
656impl IntoCString for String {
657 fn into_c_string(self) -> std::ffi::CString {
658 #[cfg(feature = "extra-logging")]
659 log::info!("created c string on heap: {:?}", self);
660
661 std::ffi::CString::new(self).expect("failed to create CString")
662 }
663}