Aeron 是一个开源高性能消息传输机制(单向),支持高效可靠的 UDP 单播、UDP 多播和 IPC 消息传输。

1. 架构

Aeron 主要由三部分组成:Media Driver、Publications 和 Subscriptions


1.1 Media Driver

Media Driver 负责管理 publications 和 subscriptions 所使用的用来发送和接收数据的 Media(UDP 或 IPC)。

各组件作用:

  1. Driver ConductorDriver Conductor 负责接收来自 Aeron 客户端的 publishers 和 subscribers 的指令,并编排 Media Driver 的操作。另外还负责域名解析任务

  2. ReceiverReceiver 管理所有从 media 中接收到的数据,数据传输轮询器接收 UDP 数据,并使用 Java NIO 与操作系统的网络堆栈进行交互。除了从 media 接收数据外,接收器还管理接收到的 images,根据需要发送 NAK 和状态消息。

  3. SenderSender 通过 media 管理数据的传输

  4. Client ConductorClient Conductor 负责与 Driver Conductor 进行沟通

Media Driver 会在本地管理一个目录,在生产环境中这个目录应该放到 /dev/shm(共享内存)上。


Media Driver 线程模型:

  • DEDICATED(默认模式) 在专用线程模式中(ThreadingMode.DEDICATED)Media Driver 将使用 3 个线程,每个线程都有特定的空闲策略:Sender 使用 senderIdleStrategy;Receiver 使用 receiverIdleStrategy;driver-conductor 使用 conductorIdleStrategy

  • Shared Network 以共享网络模式(ThreadingMode.SHARED_NETWORK)运行将线程数减少到两个 Sender 和 Receiver 在一个复合代理中,使用 sharedNetworkIdleStrategydriver-conductor 使用 conductorIdleStrategy

  • Shared 以共享模式运行(ThreadingMode.SHARED),线程数减少到 1,使用 sharedIdleStrategy(适合资源较少的环境)

  • Invoker

1.2 Publications

Publications 是应用程序用来发送数据的主要 Aeron 对象,发送数据有两种方法:offer 和 tryClaim,这两种方法都是非阻塞的,具体有什么不同,在下面介绍。

Publications 有两种类型:

  • ConcurrentPublication 使用 Aeron.addPublication 方法创建 Publication 时,返回的就是 ConcurrentPublication,它是线程安全的,可以由多个 sender 共同使用

  • ExclusivePublicationExclusivePublication 是一个独占的 Publication,通过 Aeron.addExclusivePublication 来创建。ExclusivePublication 是线程不安全的,但是由于减少了很多内部锁,所以吞吐量会提高一些,如果想要使用一个初始 position 来重播的话,必须使用 ExclusivePublication 来实现。

Offer 使用 offer 方法发送数据时,应用程序可以提供一个 Byte Buffer 来发送给 Publication subscribers。offer 将数据从 Byte Buffer 复制到本地 Log Buffer 时,会发生数据拷贝,而使用 tryClaim 方法可以避免数据拷贝。

注意事项:

  1. 单条数据的大小不能超过 min(term buffer size / 8, 16M),term buffer size 默认是 16M;

  2. 单条数据大小超过 maxPayloadLength()(默认 1376 字节)时,Aeron 会自动对数据进行分段。(在 Subscriptions 端,Aeron 并不会自动组合数据,需要使用 FragmentAssembler 来实现)

TryClaimTryClaim 必须适合性能敏感的环境,它允许开发人员提前声明 Log Buffer 的一部分,然后直接写入数据到申请到的这段区域。这样就不需要在发送数据时复制数据了。TryClaim 写入数据后需要调用 commit 方法来完成。

注意事项:

  1. 消息的最大长度不能超过 maxPayloadLength()(默认 1376 字节);

  2. tryClaim 不支持数据分段

1.3. Subscriptions

Subscriptions 用来接收消息,通过 addSubscription 来创建一个针对于指定 channel 和 stream 的 subscription,然后还需要提供一个实现了 FragmentHandler 接口的对象来处理接收到的消息。

final Subscription subscription = aeron.addSubscription(channel, streamId);


通过调用 subscription.poll(fragmentHandler, 1) 来接收数据。

2. 其他核心概念

2.1. Channels, Streams and Sessions

创建 Publication & Subscription 组合时,必须指定 Channel 和 Stream ID,Channel 就很像 TCP/IP 的配置:IP 地址 + 端口号;但是 Aeron 可以在相同的 Channel 中发送不同的数据流–通过指定 Stream ID。sessionId 是一个随机的整数,可以是负值。


2.2. Log Buffers

Log Buffers 是一个内存映射文件,由四部分构成:

  • 三个大小相同的部分:term,每个 term 都有自己的 term id, term 用来保存 header 和消息数据

  • 元数据部分,位于文件末尾

term 有三种逻辑状态:

  • clean term 尚未写入数据

  • active term 正在写入数据

  • dirty 保存不处于活动状态但暂时可用于重新传输的数据

Term Buffer Lengthsterm 大小设置(默认 16M):系统属性 aeron.term.buffer.length 或 aeron.ipc.term.buffer.length 也可以在 Media Driver context 中用 publicationTermBufferLength 和 ipcTermBufferLength 配置可以针对单个 publication 配置大小:aeron:ipc?term-length=128k 或 aeron:udp?endpoint=192.168.0.1:12345|term-length=2m

条件限制:

  • term buffer 的长度最小限制为 65536 字节(64k)

  • term buffer 的长度最大限制为 1,073,741,824 字节(1G)

  • 大小必须是 2 的幂

  • 最大消息长度是 term buffer length/8 和 16M 中的较小值

  • 如何发送超过 8kb 的消息?

  • term buffer length 直接影响着最大消息长度,设置越大就可以发送更大的消息

  • tryClaim 不支持任何超出 maxPayloadLength(默认为 1376 字节)的内容

  • 注意 /dev/shm 的大小,需要进行相应的配置

3. Demo

Publisher:


Subscriber:


  1. MediaDriver 可以作为一个独立程序启动,也可以嵌入到应用程序中启动,这里是通过调用 MediaDriver.launch 方法在应用程序中启动。

  2. 定义 channle 和 streamId,这里是通过 IPC 来传输消息。

  3. 发送消息:发送端创建 publication,使用 tryClaim 方法发送消息

  4. 接收消息:接收端创建 subscription,实现了 FragmentHandler 接口来处理接收到的数据,通过调用 subscription.poll 来接收数据。

参考资料

https://github.com/real-logic/aeron/wiki






本文文字及图片出自 InfoQ

余下全文(1/3)
分享这篇文章:

请关注我们:

发表评论

邮箱地址不会被公开。 必填项已用*标注