From 84a791386cdbd24f6baf31b653397b0a9c91a633 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Thu, 23 Jun 2016 09:41:57 -0400 Subject: [PATCH] feat(disposition): support undeliverable-here in modified outcomes Previously, specifying `undeliverable-here` as `true` in a modified outcome simply resulted in the message being rejected. This patch adds tracking to the outgoing link management in order to not redeliver messages to links that indicate that messages are not deliverable there. --- qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 31 ++++++++++++++++++---- qpid/cpp/src/qpid/broker/amqp/Outgoing.h | 26 ++++++++++++++++-- 2 files changed, 50 insertions(+), 7 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 90c268418fb..20f32a1b372 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -69,7 +69,8 @@ OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, buffer(1024)/*used only for header at present*/, //for exclusive queues, assume unreliable unless reliable is explicitly requested; otherwise assume reliable unless unreliable requested unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link)), - cancelled(false) + cancelled(false), + trackingUndeliverableMessages(false) { for (size_t i = 0 ; i < deliveries.capacity(); ++i) { deliveries[i].init(i); @@ -142,11 +143,17 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery) if (preAcquires()) { //TODO: handle message-annotations if (pn_disposition_is_undeliverable(pn_delivery_remote(delivery))) { - //treat undeliverable here as rejection - queue->reject(r.cursor); - } else { - queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery))); + if (!trackingUndeliverableMessages) { + // observe queue for changes to track undeliverable messages + queue->getObservers().add( + boost::dynamic_pointer_cast(shared_from_this())); + trackingUndeliverableMessages = true; + } + + undeliverableMessages.add(r.msg.getSequence()); } + + queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery))); } outgoingMessageRejected();//TODO: not quite true... break; @@ -168,6 +175,13 @@ bool OutgoingFromQueue::canDeliver() void OutgoingFromQueue::detached(bool closed) { QPID_LOG(debug, "Detaching outgoing link " << getName() << " from " << queue->getName()); + + if (trackingUndeliverableMessages) { + // stop observation of the queue + queue->getObservers().remove( + boost::dynamic_pointer_cast(shared_from_this())); + } + queue->cancel(shared_from_this()); //TODO: release in a clearer order? for (size_t i = 0 ; i < deliveries.capacity(); ++i) { @@ -279,6 +293,7 @@ bool match(const std::string& filter, const std::string& target) bool OutgoingFromQueue::filter(const qpid::broker::Message& m) { + if (undeliverableMessages.contains(m.getSequence())) return false; return (subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey())) && (!selector || selector->filter(m)); } @@ -334,5 +349,11 @@ boost::shared_ptr OutgoingFromQueue::getExclusiveSubscriptionQueue(Outgoi else return boost::shared_ptr(); } +void OutgoingFromQueue::dequeued(const qpid::broker::Message &m) +{ + if (undeliverableMessages.contains(m.getSequence())) { + undeliverableMessages.remove(m.getSequence()); + } +} }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h index d3825d08945..c56c8c0bf3c 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h @@ -24,6 +24,7 @@ #include "qpid/broker/amqp/Message.h" #include "qpid/broker/amqp/ManagedOutgoingLink.h" #include "qpid/broker/Consumer.h" +#include "qpid/broker/QueueObserver.h" #include #include @@ -36,12 +37,19 @@ namespace qpid { namespace sys { class OutputControl; } + +namespace framing { +class SequenceSet; +} + namespace broker { class Broker; class Queue; class Selector; + namespace amqp { class Session; + template class CircularArray { @@ -75,6 +83,7 @@ class Outgoing : public ManagedOutgoingLink * Called when a delivery is writable */ virtual void handle(pn_delivery_t* delivery) = 0; + void wakeup(); virtual ~Outgoing() {} protected: @@ -85,7 +94,9 @@ class Outgoing : public ManagedOutgoingLink * Logic for handling an outgoing link from a queue (even if it is a * subscription pseduo-queue created by the broker) */ -class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this +class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, + public boost::enable_shared_from_this, + public qpid::broker::QueueObserver { public: OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr q, pn_link_t* l, Session&, @@ -100,7 +111,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public bool canDeliver(); void detached(bool closed); - //Consumer interface: + // Consumer interface: bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg); void notify(); bool accept(const qpid::broker::Message&); @@ -110,6 +121,14 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public qpid::broker::OwnershipToken* getSession(); static boost::shared_ptr getExclusiveSubscriptionQueue(Outgoing*); + // QueueObserver interface + virtual void enqueued(const qpid::broker::Message&) {}; + virtual void acquired(const qpid::broker::Message&) {}; + virtual void requeued(const qpid::broker::Message&) {}; + virtual void dequeued(const qpid::broker::Message&); + virtual void consumerAdded(const qpid::broker::Consumer&) {}; + virtual void consumerRemoved(const qpid::broker::Consumer&) {}; + private: struct Record @@ -145,6 +164,9 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::scoped_ptr selector; bool unreliable; bool cancelled; + + bool trackingUndeliverableMessages; + qpid::framing::SequenceSet undeliverableMessages; }; }}} // namespace qpid::broker::amqp