6 #ifndef XENIUM_RAMALHETE_QUEUE_HPP
7 #define XENIUM_RAMALHETE_QUEUE_HPP
9 #include <xenium/acquire_guard.hpp>
10 #include <xenium/backoff.hpp>
11 #include <xenium/marked_ptr.hpp>
12 #include <xenium/parameter.hpp>
13 #include <xenium/policy.hpp>
14 #include <xenium/detail/pointer_queue_traits.hpp>
22 #pragma warning(disable: 4324) // structure was padded due to alignment specifier
33 template <
unsigned Value>
64 template <
class T,
class... Policies>
67 using traits = detail::pointer_queue_traits_t<T, Policies...>;
68 using raw_value_type =
typename traits::raw_type;
71 using reclaimer = parameter::type_param_t<
policy::reclaimer, parameter::nil, Policies...>;
73 static constexpr
unsigned entries_per_node = parameter::value_param_t<unsigned,
policy::entries_per_node, 512, Policies...>::value;
74 static constexpr
unsigned pop_retries = parameter::value_param_t<unsigned,
policy::pop_retries, 1000, Policies...>::value;;
76 static_assert(entries_per_node > 0,
"entries_per_node must be greater than zero");
77 static_assert(parameter::is_set<reclaimer>::value,
"reclaimer policy must be specified");
79 template <
class... NewPolicies>
92 void push(value_type value);
101 [[nodiscard]]
bool try_pop(value_type &result);
106 using concurrent_ptr =
typename reclaimer::template concurrent_ptr<node, 0>;
107 using marked_ptr =
typename concurrent_ptr::marked_ptr;
108 using guard_ptr =
typename concurrent_ptr::guard_ptr;
114 std::atomic<marked_value> value;
118 static constexpr
unsigned step_size = 11;
119 static constexpr
unsigned max_idx = step_size * entries_per_node;
121 struct node : reclaimer::template enable_concurrent_ptr<node> {
124 std::atomic<unsigned> pop_idx;
125 entry entries[entries_per_node];
126 std::atomic<unsigned> push_idx;
130 node(raw_value_type item) :
135 entries[0].value.store(item, std::memory_order_relaxed);
136 for (
unsigned i = 1; i < entries_per_node; i++)
137 entries[i].value.store(
nullptr, std::memory_order_relaxed);
141 for (
unsigned i = pop_idx; i < push_idx; i += step_size) {
142 traits::delete_value(entries[i % entries_per_node].value.load(std::memory_order_relaxed).get());
147 alignas(64) concurrent_ptr head;
148 alignas(64) concurrent_ptr tail;
151 template <
class T,
class... Policies>
154 auto n =
new node(
nullptr);
155 n->push_idx.store(0, std::memory_order_relaxed);
156 head.store(n, std::memory_order_relaxed);
157 tail.store(n, std::memory_order_relaxed);
160 template <
class T,
class... Policies>
161 ramalhete_queue<T, Policies...>::~ramalhete_queue()
164 auto n = head.load(std::memory_order_acquire);
168 auto next = n->next.load(std::memory_order_acquire);
174 template <
class T,
class... Policies>
177 raw_value_type raw_val = traits::get_raw(value);
178 if (raw_val ==
nullptr)
179 throw std::invalid_argument(
"value can not be nullptr");
185 t.acquire(tail, std::memory_order_acquire);
187 unsigned idx = t->push_idx.fetch_add(step_size, std::memory_order_relaxed);
188 if (idx >= max_idx) {
190 if (t != tail.load(std::memory_order_relaxed))
193 auto next = t->next.load(std::memory_order_relaxed);
196 node* new_node =
new node(raw_val);
197 traits::release(value);
199 marked_ptr expected =
nullptr;
201 if (t->next.compare_exchange_strong(expected, new_node,
202 std::memory_order_release,
203 std::memory_order_relaxed))
207 tail.compare_exchange_strong(expected, new_node, std::memory_order_release, std::memory_order_relaxed);
211 new_node->push_idx.store(0, std::memory_order_relaxed);
216 next = t->next.load(std::memory_order_acquire);
217 marked_ptr expected = t;
219 tail.compare_exchange_strong(expected, next, std::memory_order_release, std::memory_order_relaxed);
223 idx %= entries_per_node;
227 if (t->entries[idx].value.compare_exchange_strong(expected, raw_val, std::memory_order_release, std::memory_order_relaxed)) {
228 traits::release(value);
236 template <
class T,
class... Policies>
244 h.acquire(head, std::memory_order_acquire);
247 const auto pop_idx = h->pop_idx.load(std::memory_order_acquire);
250 const auto push_idx = h->push_idx.load(std::memory_order_relaxed);
251 if (pop_idx >= push_idx &&
252 h->next.load(std::memory_order_relaxed) ==
nullptr)
256 unsigned idx = h->pop_idx.fetch_add(step_size, std::memory_order_release);
257 if (idx >= max_idx) {
260 auto next = h->next.load(std::memory_order_acquire);
264 marked_ptr expected = h;
266 if (head.compare_exchange_strong(expected, next, std::memory_order_release, std::memory_order_relaxed))
271 idx %= entries_per_node;
273 auto value = h->entries[idx].value.load(std::memory_order_relaxed);
274 if constexpr(pop_retries > 0) {
276 ramalhete_queue::backoff retry_backoff;
277 while (value ==
nullptr && ++cnt <= pop_retries) {
278 value = h->entries[idx].value.load(std::memory_order_relaxed);
283 if (value !=
nullptr) {
285 h->entries[idx].value.load(std::memory_order_acquire);
286 traits::store(result, value.get());
290 auto value = h->entries[idx].value.exchange(
marked_value(
nullptr, 1), std::memory_order_acquire);
291 if (value !=
nullptr) {
292 traits::store(result, value.get());