1use crate::AeronErrorType::Unknown;
2#[cfg(feature = "backtrace")]
3use std::backtrace::Backtrace;
4use std::cell::UnsafeCell;
5use std::fmt::Formatter;
6use std::mem::MaybeUninit;
7use std::ops::{Deref, DerefMut};
8
9pub enum CResource<T> {
10 OwnedOnHeap(std::rc::Rc<ManagedCResource<T>>),
11 OwnedOnStack(std::mem::MaybeUninit<T>),
13 Borrowed(*mut T),
14}
15
16impl<T: Clone> Clone for CResource<T> {
17 fn clone(&self) -> Self {
18 unsafe {
19 match self {
20 CResource::OwnedOnHeap(r) => CResource::OwnedOnHeap(r.clone()),
21 CResource::OwnedOnStack(r) => {
22 CResource::OwnedOnStack(MaybeUninit::new(r.assume_init_ref().clone()))
23 }
24 CResource::Borrowed(r) => CResource::Borrowed(r.clone()),
25 }
26 }
27 }
28}
29
30impl<T> CResource<T> {
31 #[inline]
32 pub fn get(&self) -> *mut T {
33 match self {
34 CResource::OwnedOnHeap(r) => r.get(),
35 CResource::OwnedOnStack(r) => r.as_ptr() as *mut T,
36 CResource::Borrowed(r) => *r,
37 }
38 }
39
40 #[inline]
41 pub fn add_dependency<D: std::any::Any>(&self, dep: D) {
43 match self {
44 CResource::OwnedOnHeap(r) => r.add_dependency(dep),
45 CResource::OwnedOnStack(_) | CResource::Borrowed(_) => {
46 unreachable!("only owned on heap")
47 }
48 }
49 }
50 #[inline]
51 pub fn get_dependency<V: Clone + 'static>(&self) -> Option<V> {
52 match self {
53 CResource::OwnedOnHeap(r) => r.get_dependency(),
54 CResource::OwnedOnStack(_) | CResource::Borrowed(_) => None,
55 }
56 }
57
58 #[inline]
59 pub fn as_owned(&self) -> Option<&std::rc::Rc<ManagedCResource<T>>> {
60 match self {
61 CResource::OwnedOnHeap(r) => Some(r),
62 CResource::OwnedOnStack(_) | CResource::Borrowed(_) => None,
63 }
64 }
65}
66
67impl<T> std::fmt::Debug for CResource<T> {
68 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69 let name = std::any::type_name::<T>();
70
71 match self {
72 CResource::OwnedOnHeap(r) => {
73 write!(f, "{name} heap({:?})", r)
74 }
75 CResource::OwnedOnStack(r) => {
76 write!(f, "{name} stack({:?})", *r)
77 }
78 CResource::Borrowed(r) => {
79 write!(f, "{name} borrowed ({:?})", r)
80 }
81 }
82 }
83}
84
85#[allow(dead_code)]
90pub struct ManagedCResource<T> {
91 resource: *mut T,
92 cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
93 cleanup_struct: bool,
94 close_already_called: std::cell::Cell<bool>,
96 check_for_is_closed: Option<fn(*mut T) -> bool>,
98 auto_close: std::cell::Cell<bool>,
100 dependencies: UnsafeCell<Vec<std::rc::Rc<dyn std::any::Any>>>,
105}
106
107impl<T> std::fmt::Debug for ManagedCResource<T> {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 let mut debug_struct = f.debug_struct("ManagedCResource");
110
111 if !self.close_already_called.get()
112 && !self.resource.is_null()
113 && !self
114 .check_for_is_closed
115 .as_ref()
116 .map_or(false, |f| f(self.resource))
117 {
118 debug_struct.field("resource", &self.resource);
119 }
120
121 debug_struct
122 .field("type", &std::any::type_name::<T>())
123 .finish()
124 }
125}
126
127impl<T> ManagedCResource<T> {
128 pub fn new(
135 init: impl FnOnce(*mut *mut T) -> i32,
136 cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
137 cleanup_struct: bool,
138 check_for_is_closed: Option<fn(*mut T) -> bool>,
139 ) -> Result<Self, AeronCError> {
140 let resource = Self::initialise(init)?;
141
142 let result = Self {
143 resource,
144 cleanup,
145 cleanup_struct,
146 close_already_called: std::cell::Cell::new(false),
147 check_for_is_closed,
148 auto_close: std::cell::Cell::new(false),
149 dependencies: UnsafeCell::new(vec![]),
150 };
151 #[cfg(feature = "extra-logging")]
152 log::info!("created c resource: {:?}", result);
153 Ok(result)
154 }
155
156 pub fn initialise(
157 init: impl FnOnce(*mut *mut T) -> i32 + Sized,
158 ) -> Result<*mut T, AeronCError> {
159 let mut resource: *mut T = std::ptr::null_mut();
160 let result = init(&mut resource);
161 if result < 0 || resource.is_null() {
162 return Err(AeronCError::from_code(result));
163 }
164 Ok(resource)
165 }
166
167 pub fn is_closed_already_called(&self) -> bool {
168 self.close_already_called.get()
169 || self.resource.is_null()
170 || self
171 .check_for_is_closed
172 .as_ref()
173 .map_or(false, |f| f(self.resource))
174 }
175
176 #[inline(always)]
178 pub fn get(&self) -> *mut T {
179 self.resource
180 }
181
182 #[inline(always)]
183 pub fn get_mut(&self) -> &mut T {
184 unsafe { &mut *self.resource }
185 }
186
187 #[inline]
188 pub fn add_dependency<D: std::any::Any>(&self, dep: D) {
190 if let Some(dep) =
191 (&dep as &dyn std::any::Any).downcast_ref::<std::rc::Rc<dyn std::any::Any>>()
192 {
193 unsafe {
194 (*self.dependencies.get()).push(dep.clone());
195 }
196 } else {
197 unsafe {
198 (*self.dependencies.get()).push(std::rc::Rc::new(dep));
199 }
200 }
201 }
202
203 #[inline]
204 pub fn get_dependency<V: Clone + 'static>(&self) -> Option<V> {
205 unsafe {
206 (*self.dependencies.get())
207 .iter()
208 .filter_map(|x| x.as_ref().downcast_ref::<V>().cloned())
209 .next()
210 }
211 }
212
213 pub fn close(&mut self) -> Result<(), AeronCError> {
217 if self.close_already_called.get() {
218 return Ok(());
219 }
220 self.close_already_called.set(true);
221
222 let already_closed = self
223 .check_for_is_closed
224 .as_ref()
225 .map_or(false, |f| f(self.resource));
226
227 if let Some(mut cleanup) = self.cleanup.take() {
228 if !self.resource.is_null() {
229 if !already_closed {
230 let result = cleanup(&mut self.resource);
231 if result < 0 {
232 return Err(AeronCError::from_code(result));
233 }
234 }
235 self.resource = std::ptr::null_mut();
236 }
237 }
238
239 Ok(())
240 }
241}
242
243impl<T> Drop for ManagedCResource<T> {
244 fn drop(&mut self) {
245 if !self.resource.is_null() {
246 let already_closed = self.close_already_called.get()
247 || self
248 .check_for_is_closed
249 .as_ref()
250 .map_or(false, |f| f(self.resource));
251
252 let resource = if already_closed {
253 self.resource
254 } else {
255 self.resource.clone()
256 };
257
258 if !already_closed {
259 #[cfg(feature = "extra-logging")]
261 log::info!("closing c resource: {:?}", self);
262 let _ = self.close(); }
264 self.close_already_called.set(true);
265
266 if self.cleanup_struct {
267 #[cfg(feature = "extra-logging")]
268 log::info!("closing rust struct resource: {:?}", resource);
269 unsafe {
270 let _ = Box::from_raw(resource);
271 }
272 }
273 }
274 }
275}
276
277#[derive(Debug, PartialOrd, Eq, PartialEq, Clone)]
278pub enum AeronErrorType {
279 GenericError,
280 ClientErrorDriverTimeout,
281 ClientErrorClientTimeout,
282 ClientErrorConductorServiceTimeout,
283 ClientErrorBufferFull,
284 PublicationBackPressured,
285 PublicationAdminAction,
286 PublicationClosed,
287 PublicationMaxPositionExceeded,
288 PublicationError,
289 TimedOut,
290 Unknown(i32),
291}
292
293impl From<AeronErrorType> for AeronCError {
294 fn from(value: AeronErrorType) -> Self {
295 AeronCError::from_code(value.code())
296 }
297}
298
299impl AeronErrorType {
300 pub fn code(&self) -> i32 {
301 match self {
302 AeronErrorType::GenericError => -1,
303 AeronErrorType::ClientErrorDriverTimeout => -1000,
304 AeronErrorType::ClientErrorClientTimeout => -1001,
305 AeronErrorType::ClientErrorConductorServiceTimeout => -1002,
306 AeronErrorType::ClientErrorBufferFull => -1003,
307 AeronErrorType::PublicationBackPressured => -2,
308 AeronErrorType::PublicationAdminAction => -3,
309 AeronErrorType::PublicationClosed => -4,
310 AeronErrorType::PublicationMaxPositionExceeded => -5,
311 AeronErrorType::PublicationError => -6,
312 AeronErrorType::TimedOut => -234324,
313 AeronErrorType::Unknown(code) => *code,
314 }
315 }
316
317 pub fn is_back_pressured(&self) -> bool {
318 self == &AeronErrorType::PublicationBackPressured
319 }
320
321 pub fn is_admin_action(&self) -> bool {
322 self == &AeronErrorType::PublicationAdminAction
323 }
324
325 pub fn is_back_pressured_or_admin_action(&self) -> bool {
326 self.is_back_pressured() || self.is_admin_action()
327 }
328
329 pub fn from_code(code: i32) -> Self {
330 match code {
331 -1 => AeronErrorType::GenericError,
332 -1000 => AeronErrorType::ClientErrorDriverTimeout,
333 -1001 => AeronErrorType::ClientErrorClientTimeout,
334 -1002 => AeronErrorType::ClientErrorConductorServiceTimeout,
335 -1003 => AeronErrorType::ClientErrorBufferFull,
336 -2 => AeronErrorType::PublicationBackPressured,
337 -3 => AeronErrorType::PublicationAdminAction,
338 -4 => AeronErrorType::PublicationClosed,
339 -5 => AeronErrorType::PublicationMaxPositionExceeded,
340 -6 => AeronErrorType::PublicationError,
341 -234324 => AeronErrorType::TimedOut,
342 _ => Unknown(code),
343 }
344 }
345
346 pub fn to_string(&self) -> &'static str {
347 match self {
348 AeronErrorType::GenericError => "Generic Error",
349 AeronErrorType::ClientErrorDriverTimeout => "Client Error Driver Timeout",
350 AeronErrorType::ClientErrorClientTimeout => "Client Error Client Timeout",
351 AeronErrorType::ClientErrorConductorServiceTimeout => {
352 "Client Error Conductor Service Timeout"
353 }
354 AeronErrorType::ClientErrorBufferFull => "Client Error Buffer Full",
355 AeronErrorType::PublicationBackPressured => "Publication Back Pressured",
356 AeronErrorType::PublicationAdminAction => "Publication Admin Action",
357 AeronErrorType::PublicationClosed => "Publication Closed",
358 AeronErrorType::PublicationMaxPositionExceeded => "Publication Max Position Exceeded",
359 AeronErrorType::PublicationError => "Publication Error",
360 AeronErrorType::TimedOut => "Timed Out",
361 AeronErrorType::Unknown(_) => "Unknown Error",
362 }
363 }
364}
365
366#[derive(Eq, PartialEq, Clone)]
371pub struct AeronCError {
372 pub code: i32,
373}
374
375impl AeronCError {
376 pub fn from_code(code: i32) -> Self {
380 #[cfg(feature = "backtrace")]
381 {
382 if code < 0 {
383 let backtrace = Backtrace::capture();
384 let backtrace = format!("{:?}", backtrace);
385
386 let re =
387 regex::Regex::new(r#"fn: "([^"]+)", file: "([^"]+)", line: (\d+)"#).unwrap();
388 let mut lines = String::new();
389 re.captures_iter(&backtrace).for_each(|cap| {
390 let function = &cap[1];
391 let mut file = cap[2].to_string();
392 let line = &cap[3];
393 if file.starts_with("./") {
394 file = format!("{}/{}", env!("CARGO_MANIFEST_DIR"), &file[2..]);
395 } else if file.starts_with("/rustc/") {
396 file = file.split("/").last().unwrap().to_string();
397 }
398 lines.push_str(&format!(" {file}:{line} in {function}\n"));
400 });
401
402 log::error!(
403 "Aeron C error code: {}, kind: '{:?}'\n{}",
404 code,
405 AeronErrorType::from_code(code),
406 lines
407 );
408 }
409 }
410 AeronCError { code }
411 }
412
413 pub fn kind(&self) -> AeronErrorType {
414 AeronErrorType::from_code(self.code)
415 }
416
417 pub fn is_back_pressured(&self) -> bool {
418 self.kind().is_back_pressured()
419 }
420
421 pub fn is_admin_action(&self) -> bool {
422 self.kind().is_admin_action()
423 }
424
425 pub fn is_back_pressured_or_admin_action(&self) -> bool {
426 self.kind().is_back_pressured_or_admin_action()
427 }
428}
429
430pub struct Handler<T> {
449 raw_ptr: *mut T,
450 should_drop: bool,
451}
452
453unsafe impl<T> Send for Handler<T> {}
454unsafe impl<T> Sync for Handler<T> {}
455
456pub struct Handlers;
458
459impl<T> Handler<T> {
460 pub fn leak(handler: T) -> Self {
461 let raw_ptr = Box::into_raw(Box::new(handler)) as *mut _;
462 #[cfg(feature = "extra-logging")]
463 log::info!("creating handler {:?}", raw_ptr);
464 Self {
465 raw_ptr,
466 should_drop: true,
467 }
468 }
469
470 pub fn is_none(&self) -> bool {
471 self.raw_ptr.is_null()
472 }
473
474 pub fn as_raw(&self) -> *mut std::os::raw::c_void {
475 self.raw_ptr as *mut std::os::raw::c_void
476 }
477
478 pub fn release(&mut self) {
479 if self.should_drop && !self.raw_ptr.is_null() {
480 unsafe {
481 #[cfg(feature = "extra-logging")]
482 log::info!("dropping handler {:?}", self.raw_ptr);
483 let _ = Box::from_raw(self.raw_ptr as *mut T);
484 self.should_drop = false;
485 }
486 }
487 }
488
489 pub unsafe fn new(raw_ptr: *mut T, should_drop: bool) -> Self {
490 Self {
491 raw_ptr,
492 should_drop,
493 }
494 }
495}
496
497impl<T> Deref for Handler<T> {
498 type Target = T;
499
500 fn deref(&self) -> &Self::Target {
501 unsafe { &*self.raw_ptr as &T }
502 }
503}
504
505impl<T> DerefMut for Handler<T> {
506 fn deref_mut(&mut self) -> &mut Self::Target {
507 unsafe { &mut *self.raw_ptr as &mut T }
508 }
509}
510
511pub fn find_unused_udp_port(start_port: u16) -> Option<u16> {
512 let end_port = u16::MAX;
513
514 for port in start_port..=end_port {
515 if is_udp_port_available(port) {
516 return Some(port);
517 }
518 }
519
520 None
521}
522
523pub fn is_udp_port_available(port: u16) -> bool {
524 std::net::UdpSocket::bind(("127.0.0.1", port)).is_ok()
525}
526
527pub struct ChannelUri {}
529
530impl ChannelUri {
531 pub const AERON_SCHEME: &'static str = "aeron";
532 pub const SPY_QUALIFIER: &'static str = "aeron-spy";
533 pub const MAX_URI_LENGTH: usize = 4095;
534}
535
536pub const DRIVER_TIMEOUT_MS_DEFAULT: u64 = 10_000;
537pub const AERON_DIR_PROP_NAME: &str = "aeron.dir";
538pub const AERON_IPC_MEDIA: &str = "aeron:ipc";
539pub const AERON_UDP_MEDIA: &str = "aeron:udp";
540pub const SPY_PREFIX: &str = "aeron-spy:";
541pub const TAG_PREFIX: &str = "tag:";
542
543#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
545pub enum Media {
546 Ipc,
547 Udp,
548}
549
550impl Media {
551 pub fn as_str(&self) -> &'static str {
552 match self {
553 Media::Ipc => "ipc",
554 Media::Udp => "udp",
555 }
556 }
557}
558
559#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
561pub enum ControlMode {
562 Manual,
563 Dynamic,
564 Response,
566}
567
568impl ControlMode {
569 pub fn as_str(&self) -> &'static str {
570 match self {
571 ControlMode::Manual => "manual",
572 ControlMode::Dynamic => "dynamic",
573 ControlMode::Response => "response",
574 }
575 }
576}
577
578#[cfg(test)]
579#[allow(dead_code)]
580pub(crate) mod test_alloc {
581 use std::alloc::{GlobalAlloc, Layout, System};
582 use std::sync::atomic::{AtomicIsize, Ordering};
583
584 pub struct CountingAllocator {
589 allocs: AtomicIsize,
590 }
591
592 impl CountingAllocator {
593 pub const fn new() -> Self {
594 Self {
595 allocs: AtomicIsize::new(0),
596 }
597 }
598 fn current(&self) -> isize {
600 self.allocs.load(Ordering::SeqCst)
601 }
602 }
603
604 unsafe impl GlobalAlloc for CountingAllocator {
605 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
606 self.allocs.fetch_add(1, Ordering::SeqCst);
607 System.alloc(layout)
608 }
609 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
610 self.allocs.fetch_sub(1, Ordering::SeqCst);
611 System.dealloc(ptr, layout)
612 }
613 }
614
615 #[global_allocator]
616 static GLOBAL: CountingAllocator = CountingAllocator::new();
617
618 pub fn current_allocs() -> isize {
620 GLOBAL.current()
621 }
622}
623
624pub trait IntoCString {
625 fn into_c_string(self) -> std::ffi::CString;
626}
627
628impl IntoCString for std::ffi::CString {
629 fn into_c_string(self) -> std::ffi::CString {
630 self
631 }
632}
633
634impl IntoCString for &str {
635 fn into_c_string(self) -> std::ffi::CString {
636 #[cfg(feature = "extra-logging")]
637 log::info!("created c string on heap: {:?}", self);
638
639 std::ffi::CString::new(self).expect("failed to create CString")
640 }
641}
642
643impl IntoCString for String {
644 fn into_c_string(self) -> std::ffi::CString {
645 #[cfg(feature = "extra-logging")]
646 log::info!("created c string on heap: {:?}", self);
647
648 std::ffi::CString::new(self).expect("failed to create CString")
649 }
650}