经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » C++ » 查看文章
iceoryx源码阅读(四)——共享内存通信(二)
来源:cnblogs  作者:爱新觉罗·L  时间:2024/5/29 9:09:56  对本文有异议

0 导引

本文阅读与共享内存通信相关的逻辑。发布者首先获取一块共享内存,往其中写入数据,然后向消息队列中推入消息描述数据,订阅者从消息队列中读取消息描述数据。本文从四方面进行解读:队列数据结构、共享内存获取、消息发送逻辑、消息接收逻辑。

1 队列数据结构

根据前文知道,队列元素为ShmSafeUnmanagedChunk,其中存放的是ChunkManagement所在共享内存段的id和相对该共享内存首地址的偏移,具体如下所示:

image

消息队列由如下代码定义:

  1. struct ChunkQueueData : public LockingPolicy
  2. {
  3. // ...
  4. static constexpr uint64_t MAX_CAPACITY = ChunkQueueDataProperties_t::MAX_QUEUE_CAPACITY;
  5. cxx::VariantQueue<mepoo::ShmSafeUnmanagedChunk, MAX_CAPACITY> m_queue;
  6. // ...
  7. };
  8. struct ChunkDistributorData : public LockingPolicy
  9. {
  10. // ...
  11. using QueueContainer_t =
  12. cxx::vector<memory::RelativePointer<ChunkQueueData_t>, ChunkDistributorDataProperties_t::MAX_QUEUES>;
  13. QueueContainer_t m_queues;
  14. // ...
  15. };
  16. struct ChunkReceiverData : public ChunkQueueDataType
  17. {
  18. // ...
  19. };
  • ChunkDistributorData是发布者所持有的队列数据结构,由于一个发布者会分发至多个订阅端,所以持有多个队列。

  • ChunkReceiverData是订阅者的组件,它继承自ChunkQueueData,内部只有一个队列,队列元素类型为ShmSafeUnmanagedChunk

上述代码中,队列数据结构的类型为cxx::VariantQueue<mepoo::ShmSafeUnmanagedChunk, MAX_CAPACITY>。从类名看,是一个变长数组,但实际上这是一个定长数组,以下是相关数据结构定义:

  1. enum class VariantQueueTypes : uint64_t
  2. {
  3. FiFo_SingleProducerSingleConsumer = 0,
  4. SoFi_SingleProducerSingleConsumer = 1,
  5. FiFo_MultiProducerSingleConsumer = 2,
  6. SoFi_MultiProducerSingleConsumer = 3
  7. };
  8. template <typename ValueType, uint64_t Capacity>
  9. class VariantQueue
  10. {
  11. public:
  12. using fifo_t = variant<concurrent::FiFo<ValueType, Capacity>,
  13. concurrent::SoFi<ValueType, Capacity>,
  14. concurrent::ResizeableLockFreeQueue<ValueType, Capacity>,
  15. concurrent::ResizeableLockFreeQueue<ValueType, Capacity>>;
  16. // ...
  17. private:
  18. VariantQueueTypes m_type;
  19. fifo_t m_fifo;
  20. };

fifo_t是队列底层结构类型,可能是concurrent::FiFoconcurrent::SoFiconcurrent::ResizeableLockFreeQueue之一,至于使用哪一种,由枚举值m_type确定。这三个内部会依赖以下数据结构:

  1. template <typename ElementType, uint64_t Capacity>
  2. struct NonZeroedBuffer
  3. {
  4. struct alignas(ElementType) element_t
  5. {
  6. cxx::byte_t data[sizeof(ElementType)];
  7. };
  8. element_t value[Capacity];
  9. };

上面这一结构本质就是一个数组,其元素类型类型为Element。

2 共享内存获取

发送数据前,应用程序首先需要先获取一块合适大小的Chunk,往其中写入数据,然后调用消息发送接口进行发送。

2.1 PublisherImpl::loan

职责:

获取一块共享内存,并调用构造函数进行初始化。

入参:

args:模板变参,用于调用待传类型的构造函数,也可以不传。

返回:

Sample类型实例,本质是对用户可操作的共享内存段的封装。

  1. template <typename T, typename H, typename BasePublisherType>
  2. template <typename... Args>
  3. inline cxx::expected<Sample<T, H>, AllocationError>
  4. PublisherImpl<T, H, BasePublisherType>::loan(Args&&... args) noexcept
  5. {
  6. return std::move(loanSample().and_then([&](auto& sample) { new (sample.get()) T(std::forward<Args>(args)...); }));
  7. }

整体代码分析:

首先调用loanSample方法获取共享内存,然后调用构造函数进行初始化,这里使用Placement new语法。需要指出的是,loanSample返回的是将用于存放用户数据的首地址,而不是Chunk的首地址。

2.2 PublisherImpl::loanSample

职责:

分配共享内存,并将其转换为Sample类型,并返回。

返回:

Sample类型实例。

  1. template <typename T, typename H, typename BasePublisherType>
  2. inline cxx::expected<Sample<T, H>, AllocationError> PublisherImpl<T, H, BasePublisherType>::loanSample() noexcept
  3. {
  4. static constexpr uint32_t USER_HEADER_SIZE{std::is_same<H, mepoo::NoUserHeader>::value ? 0U : sizeof(H)};
  5. auto result = port().tryAllocateChunk(sizeof(T), alignof(T), USER_HEADER_SIZE, alignof(H));
  6. if (result.has_error())
  7. {
  8. return cxx::error<AllocationError>(result.get_error());
  9. }
  10. else
  11. {
  12. return cxx::success<Sample<T, H>>(convertChunkHeaderToSample(result.value()));
  13. }
  14. }

整体代码分析:

首先调用tryAllocateChunk获得一块共享内存,并构造Sample实例。

2.3 PublisherPortUser::tryAllocateChunk

职责:

分配共享内存,并将其转换为Sample类型,并返回。

入参:

4个用于计算所需共享内存大小的参数,这里不展开介绍了。

返回值:

共享内存首地址(类型为ChunkHeader *,见4.1 Chunk管理结构

  1. cxx::expected<mepoo::ChunkHeader*, AllocationError>
  2. PublisherPortUser::tryAllocateChunk(const uint32_t userPayloadSize,
  3. const uint32_t userPayloadAlignment,
  4. const uint32_t userHeaderSize,
  5. const uint32_t userHeaderAlignment) noexcept
  6. {
  7. return m_chunkSender.tryAllocate(
  8. getUniqueID(), userPayloadSize, userPayloadAlignment, userHeaderSize, userHeaderAlignment);
  9. }

整体代码分析:

上述函数只是简单地调用ChunkSendertryAllocate方法。

2.4 ChunkSender::tryAllocate

职责:

调用MemoryManager的成员方法getChunk得到共享内存块或复用最后一次使用的共享内存块。

入参:

同上(略)

返回值:

指向共享内存块首地址的指针,类型为ChunkHeader

  1. template <typename ChunkSenderDataType>
  2. inline cxx::expected<mepoo::ChunkHeader*, AllocationError>
  3. ChunkSender<ChunkSenderDataType>::tryAllocate(const UniquePortId originId,
  4. const uint32_t userPayloadSize,
  5. const uint32_t userPayloadAlignment,
  6. const uint32_t userHeaderSize,
  7. const uint32_t userHeaderAlignment) noexcept
  8. {
  9. const auto chunkSettingsResult =
  10. mepoo::ChunkSettings::create(userPayloadSize, userPayloadAlignment, userHeaderSize, userHeaderAlignment);
  11. if (chunkSettingsResult.has_error())
  12. {
  13. return cxx::error<AllocationError>(AllocationError::INVALID_PARAMETER_FOR_USER_PAYLOAD_OR_USER_HEADER);
  14. }
  15. const auto& chunkSettings = chunkSettingsResult.value();
  16. const uint32_t requiredChunkSize = chunkSettings.requiredChunkSize();
  17. auto& lastChunkUnmanaged = getMembers()->m_lastChunkUnmanaged;
  18. mepoo::ChunkHeader* lastChunkChunkHeader =
  19. lastChunkUnmanaged.isNotLogicalNullptrAndHasNoOtherOwners() ? lastChunkUnmanaged.getChunkHeader() : nullptr;
  20. if (lastChunkChunkHeader && (lastChunkChunkHeader->chunkSize() >= requiredChunkSize))
  21. {
  22. /* * * * * 见代码段2-4-1:复用最近一次分配的共享内存 * * * * */
  23. }
  24. else
  25. {
  26. /* * * * * 见代码段2-4-2:分配一块新的未使用的共享内存 * * * * */
  27. }
  28. }

逐段代码分析:

  • LINE 09 ~ LINE 17: 计算所需共享内存大小。

  • LINE 19 ~ LINE 30: 判断最近一次分配的共享内存块是否所有订阅者都已读取,并且大小超过所需大小,则复用最近一次分配的共享内存块,否则新分配共享内存块。

代码段2-4-1:复用最近一次分配的共享内存

  1. auto sharedChunk = lastChunkUnmanaged.cloneToSharedChunk();
  2. if (getMembers()->m_chunksInUse.insert(sharedChunk))
  3. {
  4. auto chunkSize = lastChunkChunkHeader->chunkSize();
  5. lastChunkChunkHeader->~ChunkHeader();
  6. new (lastChunkChunkHeader) mepoo::ChunkHeader(chunkSize, chunkSettings);
  7. lastChunkChunkHeader->setOriginId(originId);
  8. return cxx::success<mepoo::ChunkHeader*>(lastChunkChunkHeader);
  9. }
  10. else
  11. {
  12. return cxx::error<AllocationError>(AllocationError::TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL);
  13. }

整体代码分析:

如果正在使用的共享内存块未满,则插入,并析构之前的数据,同时在这块内存上构造新的ChunkHeader;否则返回错误。

代码段2-4-2:分配一块新的未使用的共享内存

  1. auto getChunkResult = getMembers()->m_memoryMgr->getChunk(chunkSettings);
  2. if (!getChunkResult.has_error())
  3. {
  4. auto& chunk = getChunkResult.value();
  5. // if the application allocated too much chunks, return no more chunks
  6. if (getMembers()->m_chunksInUse.insert(chunk))
  7. {
  8. // END of critical section
  9. chunk.getChunkHeader()->setOriginId(originId);
  10. return cxx::success<mepoo::ChunkHeader*>(chunk.getChunkHeader());
  11. }
  12. else
  13. {
  14. // release the allocated chunk
  15. chunk = nullptr;
  16. return cxx::error<AllocationError>(AllocationError::TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL);
  17. }
  18. }
  19. else
  20. {
  21. /// @todo iox-#1012 use cxx::error<E2>::from(E1); once available
  22. return cxx::error<AllocationError>(cxx::into<AllocationError>(getChunkResult.get_error()));
  23. }

整体代码分析:

调用MemoryManager的成员方法getChunk获取共享内存块,如果获取成功,存入数组m_chunksInUse。如果获取失败或数组已满,则返回获取失败,此时根据RAII原理,SharedChunk的析构函数会自动将共享内存块返还给MemPool

m_chunksInUse内部封装的数组元素的类型为我们在上一篇文章中介绍的ShmSafeUnmanagedChunk,这个类型不具有引用计数,为什么退出作用域不会被析构?

为什么要存m_chunksInUse数组?原因如下:我们看到tryAllocate返回的是消息内存块的指针,而消息发送的时候需要使用SharedChunk,我们无法将前者转换为后者。所以,此处存入数组,消息发送函数中通过消息内存块的指针查找对应数组元素,恢复出SharedChunk实例,具体见3.3

3 消息发送逻辑

本质是往消息队列推入消息描述结构ShmSafeUnmanagedChunk

3.1 PublisherImpl::publish

职责:

上层应用程序调用此方法推送消息。

入参:

sample:用户负载数据的封装实例。

  1. template <typename T, typename H, typename BasePublisherType>
  2. inline void PublisherImpl<T, H, BasePublisherType>::publish(Sample<T, H>&& sample) noexcept
  3. {
  4. auto userPayload = sample.release(); // release the Samples ownership of the chunk before publishing
  5. auto chunkHeader = mepoo::ChunkHeader::fromUserPayload(userPayload);
  6. port().sendChunk(chunkHeader);
  7. }

整体代码分析:

上述代码从sample中取出用户负载数据指针,据此计算Chunk首地址,然后调用sendChunk进行发送。

根据用户负载数据指针计算Chunk首地址其实就是减去一个偏移量,具体计算方法如下:

  1. ChunkHeader* ChunkHeader::fromUserPayload(void* const userPayload) noexcept
  2. {
  3. if (userPayload == nullptr)
  4. {
  5. return nullptr;
  6. }
  7. uint64_t userPayloadAddress = reinterpret_cast<uint64_t>(userPayload);
  8. auto backOffset = reinterpret_cast<UserPayloadOffset_t*>(userPayloadAddress - sizeof(UserPayloadOffset_t));
  9. return reinterpret_cast<ChunkHeader*>(userPayloadAddress - *backOffset);
  10. }

其中偏移放在payload之前,即:*backOffset

3.2 PublisherPortUser::sendChunk

职责:

发送用户数据。

入参:

chunkHeaderChunkHeader类型的指针,Chunk的首地址。

  1. void PublisherPortUser::sendChunk(mepoo::ChunkHeader* const chunkHeader) noexcept
  2. {
  3. const auto offerRequested = getMembers()->m_offeringRequested.load(std::memory_order_relaxed);
  4. if (offerRequested)
  5. {
  6. m_chunkSender.send(chunkHeader);
  7. }
  8. else
  9. {
  10. m_chunkSender.pushToHistory(chunkHeader);
  11. }
  12. }

整体代码分析:

3.3 ChunkSender::send

职责:

发送用户数据。

入参:

chunkHeaderChunkHeader指针,Chunk的首地址。

  1. template <typename ChunkSenderDataType>
  2. inline uint64_t ChunkSender<ChunkSenderDataType>::send(mepoo::ChunkHeader* const chunkHeader) noexcept
  3. {
  4. uint64_t numberOfReceiverTheChunkWasDelivered{0};
  5. mepoo::SharedChunk chunk(nullptr);
  6. // BEGIN of critical section, chunk will be lost if the process terminates in this section
  7. if (getChunkReadyForSend(chunkHeader, chunk))
  8. {
  9. numberOfReceiverTheChunkWasDelivered = this->deliverToAllStoredQueues(chunk);
  10. getMembers()->m_lastChunkUnmanaged.releaseToSharedChunk();
  11. getMembers()->m_lastChunkUnmanaged = chunk;
  12. }
  13. // END of critical section
  14. return numberOfReceiverTheChunkWasDelivered;
  15. }

逐段代码分析:

  • LINE 05 ~ LINE 07: 根据chunkHeader指针和m_chunksInUse数组,恢复SharedChunk实例;

  • LINE 09 ~ LINE 09: 调用基类的成员方法deliverToAllStoredQueues向各队列发送(推入)消息;

  • LINE 11 ~ LINE 12: 更新m_lastChunkUnmanaged实例,以提升性能。

3.4 ChunkDistributor::deliverToAllStoredQueues

  1. template <typename ChunkDistributorDataType>
  2. inline uint64_t ChunkDistributor<ChunkDistributorDataType>::deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept
  3. {
  4. uint64_t numberOfQueuesTheChunkWasDeliveredTo{0U};
  5. typename ChunkDistributorDataType::QueueContainer_t remainingQueues;
  6. /* * * * * 见代码段3-3-1:向队列发送消息,失败入remainingQueues * * * * */
  7. /* * * * * 见代码段3-3-2:发送失败的不断尝试重新发送 * * * * */
  8. addToHistoryWithoutDelivery(chunk);
  9. return numberOfQueuesTheChunkWasDeliveredTo;
  10. }

整体代码分析:

这部分没有什么内容,主要实现在代码段3-3-1和代码段3-3-2。

代码段3-3-1:

  1. {
  2. {
  3. typename MemberType_t::LockGuard_t lock(*getMembers());
  4. bool willWaitForConsumer = getMembers()->m_consumerTooSlowPolicy == ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER;
  5. // send to all the queues
  6. for (auto& queue : getMembers()->m_queues)
  7. {
  8. bool isBlockingQueue = (willWaitForConsumer && queue->m_queueFullPolicy == QueueFullPolicy::BLOCK_PRODUCER);
  9. if (pushToQueue(queue.get(), chunk))
  10. {
  11. ++numberOfQueuesTheChunkWasDeliveredTo;
  12. }
  13. else
  14. {
  15. if (isBlockingQueue)
  16. {
  17. remainingQueues.emplace_back(queue);
  18. }
  19. else
  20. {
  21. ++numberOfQueuesTheChunkWasDeliveredTo;
  22. ChunkQueuePusher_t(queue.get()).lostAChunk();
  23. }
  24. }
  25. }
  26. }

整体代码分析:

这段代码整体上是遍历所有订阅者队列,调用pushToQueue向消息队列推入消息,实现消息发送。但是消息队列的长度是有限的,如果由于订阅者处理速度太慢,队列满了应该怎么处理,根据设置,可以选择两种应对策略:

  • 将队列保存下来(LINE 17 ~ LINE 20),后续对这些队列不断尝试发送,直到所有队列推送成功,见代码段3-3-2;

  • 将队列标记为有消息丢失(LINE 22 ~ LINE 25):

  1. template <typename ChunkQueueDataType>
  2. inline void ChunkQueuePusher<ChunkQueueDataType>::lostAChunk() noexcept
  3. {
  4. getMembers()->m_queueHasLostChunks.store(true, std::memory_order_relaxed);
  5. }

代码段3-3-2:不断尝试发送,直到所有消息发送成功

  1. cxx::internal::adaptive_wait adaptiveWait;
  2. while (!remainingQueues.empty())
  3. {
  4. adaptiveWait.wait();
  5. {
  6. typename MemberType_t::LockGuard_t lock(*getMembers());
  7. /* * * * * 见代码段3-3-3:与活跃队列求交 * * * * */
  8. for (uint64_t i = remainingQueues.size() - 1U; !remainingQueues.empty(); --i)
  9. {
  10. if (pushToQueue(remainingQueues[i].get(), chunk))
  11. {
  12. remainingQueues.erase(remainingQueues.begin() + i);
  13. ++numberOfQueuesTheChunkWasDeliveredTo;
  14. }
  15. if (i == 0U)
  16. {
  17. break;
  18. }
  19. }
  20. }
  21. }

整体代码分析:

这部分代码就是对剩余未发送成功的队列进行重新发送,直到所有队列发送成功。每轮尝试中间会使用yield或sleep函数等待一段时间,以免不必要的性能浪费。同时,发送过程中,还会与当前活跃队列求交,如下:

代码段3-3-3:与活跃队列求交

  1. typename ChunkDistributorDataType::QueueContainer_t queueIntersection(remainingQueues.size());
  2. auto greaterThan = [](memory::RelativePointer<ChunkQueueData_t>& a,
  3. memory::RelativePointer<ChunkQueueData_t>& b) -> bool {
  4. return reinterpret_cast<uint64_t>(a.get()) > reinterpret_cast<uint64_t>(b.get());
  5. };
  6. std::sort(getMembers()->m_queues.begin(), getMembers()->m_queues.end(), greaterThan);
  7. std::sort(remainingQueues.begin(), remainingQueues.end(), greaterThan);
  8. auto iter = std::set_intersection(getMembers()->m_queues.begin(),
  9. getMembers()->m_queues.end(),
  10. remainingQueues.begin(),
  11. remainingQueues.end(),
  12. queueIntersection.begin(),
  13. greaterThan);
  14. queueIntersection.resize(static_cast<uint64_t>(iter - queueIntersection.begin()));
  15. remainingQueues = queueIntersection;

整体代码分析:

上面这段代码就是求解remainingQueues和当前活跃队列m_queues交集,以免发生无限循环。set_intersection是C++标准库函数,详见:https://en.cppreference.com/w/cpp/algorithm/set_intersection

至此,消息发送的流程分析完毕。

4 小结

本文介绍了消息发布者获取共享内存块和发送逻辑,下文将介绍消息订阅者的接收逻辑。

原文链接:https://www.cnblogs.com/lijihong-jerry/p/18095668

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号