Class ERequestMonitorFeed

  • All Implemented Interfaces:
    IEFeed, IERequestMonitorFeed

    public final class ERequestMonitorFeed
    extends EFeed
    implements IERequestMonitorFeed
    ERequestMonitorFeed is the application entry point for monitoring request messages to and reply messages from local repliers. This feed cannot be used to monitor messages between remote requesters and repliers. Follow these steps to use this feed when monitoring requests and replies:

    Step 1: Implement ERequestMonitor interface.

    Step 2: Build request monitor for ERequestMonitor instance and type+topic request message key. Note that the feed scope is set to FeedScope.LOCAL and cannot be overridden.

    Use ERequestMonitorFeed.Builder.requestCallback(MessageCallback), ERequestMonitorFeed.Builder.replyCallback(MessageCallback), and ERequestMonitorFeed.Builder.cancelCallback(MessageCallback) to set Java lambda expressions used in place of ERequestMonitor interface methods.

    Step 3: Start monitoring requests to and replies from local repliers. Please note that unlike other eBus roles (publisher, subscriber, etc.) the request monitor is not informed when the request feed is up or down. That is because the monitor is not part of this conversation but is standing to the side watching the message flow.

    Step 4: ERequestMonitor receives request messages, reply messages, and cancellations flowing between requesters and local repliers.

    Step 5: When request monitor is shutting down, stop monitoring and close the monitor feed.

    Example use of ERequestMonitorFeed

    import java.time.Duration;
    import java.util.HashMap;
    import java.util.Map;
    import net.sf.eBus.client.ERequestMonitor;
    import net.sf.eBus.client.ERequestMonitorFeed;
    import net.sf.eBus.client.EScheduledExecutor;
    import net.sf.eBus.client.IETimer;
    import net.sf.eBus.messages.EMessageKey;
    import net.sf.eBus.messages.EReplyMessage;
    import net.sf.eBus.messages.ERequestMessage;
    
    Step 1: Implement ERequestMonitor interface.
    public class CatalogRequestMonitor implements ERequestMonitor {
        private static final Duration REPLY_DELAY = Duration.ofHours(3L);
    
        private final EMessageKey mKey;
        private final Map<String, RequestInfo> mOrders;
        private ERequestMonitorFeed mFeed;
        private IETimer mReplyTimer;
    
        public CatalogRequestMonitor(final String subject) {
            mKey = new EMessageKey(com.acme.CatalogOrder.class, subject);
            mOrders = new HashMap<>();
        }
    
        @Override public void startup() {
            try
            {
                final EScheduledExecutor executor = EScheduledExecutor.getExecutor("blocking");
    
                Step 2: Open a request monitor feed for request message key. May override ERequestMonitor interfaces methods.
                mFeed = (ERequestMonitor.builder()).target(this)
                                                   .messageKey(mKey)
                                                   .build();
    
                Step 3: Start monitoring.
                mFeed.monitor();
    
                mReplyTimer = executor.scheduleAtFixedRate(this::onReplyTimeout, this, REPLY_DELAY, REPLY_DELAY);
            } catch (IllegalArgumentException argex) {
                // Monitoring failed. Place recovery code here.
            }
        }
    
        Step 4: Wait for requests, replies, and cancellations to arrive.
        @Override public void request(final ERequestMessage request, final String replier) {
            final CatalogOrder order = (CatalogOrder) request;
    
            // No need to synchronize orders map since all callbacks are on this object's dispatcher.
            mOrders.put(order.orderId, new RequestInfo(order, replier));
        }
    
        @Override public void reply(final EReplyMessage reply, final String replier) {
            final CatalogOrderReply orderReply = (CatalogOrderReply) reply;
            final String orderId = orderReply.orderId;
    
            // Is this the final reply?
            if (orderReply.isFinal) {
                // Yes, stop monitoring the order.
                mOrders.remove(orderId);
            } else {
                // No, update order timestamp since replier is actively working this order.
                (mOrders.get(orderId)).updateTimestamp();
            }
        }
    
        @Override public void cancel(final ERequestMessage request, final String replier) {
            final CatalogOrder order = (CatalogOrder) request;
    
            // Remove this order since it is now defunct.
            mOrders.remove(order.orderId);
        }
    
        @Override public void shutdown() {
            mOrders.clear();
    
            Step 5: When shutting down, either unmonitor or close request monitor feed.
            if (mFeed != null) {
                mFeed.close();
                mFeed = null;
            }
        }
    
        private void onReplyTimeout() {
            // Iterate over orders looking for failed replies.
            for (OrderInfo oInfo : mOrders.values()) {
                // If timed out waiting for a reply, raise an alarm.
            }
        }
    }

    Please note: there currently is no matching multi-request monitor feed.

    Author:
    Charles W. Rapp
    See Also:
    ERequestMonitor
    • Field Detail

      • mFeedType

        protected final net.sf.eBus.client.ESingleFeed.FeedType mFeedType
        Specifies whether this is a publish, subscribe, request, or reply feed.
      • mSubject

        protected final net.sf.eBus.client.ESubject mSubject
        The feed interfaces with this eBus subject. The subject type is based on feed type.
      • mActivationCount

        protected int mActivationCount
        Tracks the number of contra-feeds matched to this feed. If this is a publisher feed, then counts subscriber feeds. If a subscriber feed, then counts publisher feeds.
    • Method Detail

      • inactivate

        protected void inactivate()
        If request monitor is in place, then retract this monitor feed.
        Specified by:
        inactivate in class EFeed
        See Also:
        EFeed.close()
      • monitor

        public void monitor()
        Adds this request monitor feed to associated request subject. If this monitor feed is currently in place, then does nothing.
        Specified by:
        monitor in interface IERequestMonitorFeed
        Throws:
        java.lang.IllegalStateException - if this feed is closed.
        See Also:
        unmonitor(), EFeed.close()
      • unmonitor

        public void unmonitor()
        Retracts this request monitor feed from associated request subject. Does nothing if this feed is not currently in place.
        Specified by:
        unmonitor in interface IERequestMonitorFeed
        Throws:
        java.lang.IllegalStateException
        See Also:
        monitor(), EFeed.close()
      • isMonitoring

        public boolean isMonitoring()
        Returns true if this request monitor feed is both open and monitoring; otherwise returns false.
        Returns:
        true if this request monitor feed is both open and monitoring.
      • builder

        public static ERequestMonitorFeed.Builder builder()
        Returns a new ERequestMonitorFeed builder instance. This instance is used to build a single request monitor feed instance and should not be used to create multiple such feeds.
        Returns:
        new request monitor feed builder.
      • toString

        public java.lang.String toString()
        Returns a containing the feed message key and data member values.
        Overrides:
        toString in class EFeed
        Returns:
        textual representation of this feed.
      • key

        public final EMessageKey key()
        Returns the feed message key. The message key type matches the feed type. If this is a EPublishFeed, then a notification message key is returned.
        Returns:
        message key.
      • messageSubject

        public final java.lang.String messageSubject()
        Returns the feed message key subect.
        Returns:
        message key subject.
      • activationCount

        public final int activationCount()
        Returns the feed activation count.
        Returns:
        value ≥ zero.