eBus v. 7.6.0
API Specification
Version: 7.6.0
Introduction
eBus provides two services: message routing and object task execution. Messages are routed between objects that are either in the same process, between processes on the same host, or between processes on different hosts - and the objects are unaware of where the other object resides. 1 There are two types of message routing: Notification and Request/Reply. eBus uses a broker-less, type+topic-based message routing architecture. Broker-less means that a separate message routing server is not used. Rather messages are transmitted directly between objects via the eBus API. Type+topic message routing means that the message class together with the message subject are used to route the message to subscribers. This technique provides type safety missing from topic-only routing. eBus refers to the type+topic combination as a message key.
Object execution is when eBus passes inbound messages to an
application object method asynchronously using a
dispatcher thread. These threads
may be grouped together and associated with one or more
application classes. If an application class is not
assigned to a dispatcher group, then it is assigned to the
default dispatcher group. An eBus client is run by only one
dispatcher thread at any given time. While the client may
be run by different dispatchers over time, only one
dispatcher will run the client at any given moment ("run a
client" means executing the client callback tasks). See
EConfigure.Dispatcher
for more
information on configuring eBus dispatcher threads.
Messages are delivered to eBus clients asynchronously using dispatcher threads.
eBus maintains a weak reference to application objects which act as eBus clients. When eBus detects that a client is finalized, eBus automatically cleans up all the client's extant eBus feeds. Even so, it is better for an application to explicitly close its feeds when an application object is no longer needed rather than depend on automatic clean up.
The eBus Programmer's Manual covers the following information in greater detail.
eBus (as of release 5.0.0) is a Maven project deployed to Maven Central. All further releases will be available through Maven Central and make be accessed using the Maven dependency:
<dependency>
<groupId>net.sf.ebus</groupId>
<artifactId>core</artifactId>
<version>7.6.0</version>
</dependency>
Gradle (long)
implementation group: 'net.sf.ebus', name: 'core', version: '7.6.0'
Gradle (short)
implementation 'net.sf.ebus:core:7.6.0'
Note: As of eBus release 6.0.0 this library is now based on Java 11. Previous versions are Java 1.8-based. Use appropriate version.
eBus Messages
An eBus message is defined by extending one of the following classes and providing a "builder" method to create the message from the deserialized fields:
Message fields must consist of
public final
data members whose types from the
following list:
boolean | Boolean |
BigInteger |
BigDecimal |
byte | Byte |
char | Character |
Class |
Date |
double | Double |
Duration |
EField |
EFieldList |
EMessage |
EMessageList |
EMessageKey |
enum |
File |
float | Float |
net.sf.eBusx.geo GeoJSON Types |
InetAddress |
InetSocketAddress |
Instant |
int | Integer |
LocalDate |
LocalDateTime |
LocalTime |
long | Long |
MonthDay |
OffsetTime |
OffsetDateTime |
Decimal |
Period |
short | Short |
String |
URI |
UUID |
Year |
YearMonth |
ZonedDateTime |
ZoneId |
ZoneOffset |
<field type>[] |
Any eBus field type can be turned into an
homogenous
2
array by appending []
to the field name.
An eBus message class must provide a public static
method literally named builder()
which returns an
instance of a public static
inner builder class.
This builder class must extend the builder class matching
the message base. That means if the message base class is:
-
ENotificationClass
, then builder extendsENotificationClass.Builder<M, B>
, -
ERequestClass
, the builder extendsERequestClass.Builder<M, B>
, and -
EReplyClass
, then builder extendsEReplyClass.Builder<M, B>
.
where M
is the target class generated by the
builder and B
is the inner class builder. The
reason for B
is for setters defined in the super
class. Builder setter methods return the builder instance
which allows for setters to be chained
(builder.setHeight(...).setWeight(...).setAge(...);
).
Super class setters maintain this chain by
return ((B) this)
.
long
value
where each boolean is a single bit within that long
.
Consider the following: a user message extends
EReplyMessage
with class
OrderReply
. EReplyMessage
has two fields of
its own:
replyStatus
and
replyReason
.
EReplyMessage
extends EMessage
which has
another two fields:
subject
and
timestamp
.
The example OrderReply
message class and
OrderPart
message field class are:
The builder set method names must exactly match
the message field names. The OrderReply
fields
names are: parts
, shippingCost
, and
totalPrice
, so the builder must have setter method
names which match those three.
import java.math.BigDecimal; import java.math.BigDecimal;
import net.sf.eBus.messages.EReplyMessage; import net.sf.eBus.messages.EField;
public final class OrderReply public final class OrderPart
extends EReplyMessage extends EField
implements Serializable implements Serializable
{ {
@FieldDisplayIndex(index = 0) @FieldDisplayIndex(index = 0)
public final OrderPart[] parts; public final String partId;
@FieldDisplayIndex(index = 1) @FieldDisplayIndex(index = 1)
public final BigDecimal shippingCost; public final int quantity;
@FieldDisplayIndex(index = 2) @FieldDisplayIndex(index = 2)
public final BigDecimal totalPrice; public final BigDecimal pricePerPart;
private static final long serializeVersionUID = 0x1L; private static final long serializeVersionUID = 0x1L;
// Required "de-serialization" constructor. // Required "de-serialization" constructor.
// Private because only the builder may access this // Private because only the builder may access this
// constructor. // constructor.
private OrderReply(final OrderReplyBuilder builder) { private OrderPart(final OrderPartBuilder builder) {
super (builder); super(builder);
this.parts = builder.mParts; this.partId = builder.mPartId;
this.shippingCost = builder.mShipping; this.quantity = builder.mQuantity;
this.totalPrice = builder.mTotalPrice; this.pricePerPart = builder.mPrice;
} }
public BigDecimal totalPrice() {
return (pricePerPart.multiply(BigDecimal.valueOf((long) quantity)));
}
public static OrderReplyBuilder builder() { public static final class OrderReplyBuilder {
return (new OrderReplyBuilder()); return (new OrderPartBuilder());
} }
public static final class OrderPartBuilder public static final class OrderPartBuilder
extends EReplyMessage.Builder<OrderReply, OrderReplyBuilder> extends EField.Builder<OrderPart, OrderPartBuilder>
{ {
private OrderPart[] mParts; private String mPartId;
private BigDecimal mShipping; private int mQuantity;
private BigDecimal mTotalPrice; private BigDecimal mPrice;
private OrderReplyBuilder() { private OrderPartBuilder() {
super (OrderReply.class) super (OrderPart.class);
} }
public OrderReplyBuilder parts(final OrderPart[] parts) { public OrderPartBuilder partId(final String id) {
if (parts == null || parts.length == 0) { if (id == null || id.isEmpty()) {
throw ( throw (
new IllegalArgumentException( new IllegalArgumentException(
"parts is null or empty")); "id is null or empty"));
} }
mParts = parts; mPartId = id;
return (this); return (this);
} }
public OrderReplyBuilder shippingCost(final double cost) { public OrderPartBuilder quantity(final int quantity) {
if (cost < 0d) { if (quantity < 0) {
throw (new IllegalArgument("const < zero")); throw (new IllegalArgumentException("quantity < zero"));
} }
mShipping = BigDecimal.valueOf(cost); mQuantity = quantity;
return (this); return (this);
} }
public OrderReplyBuilder totalPrice(final double px) { public OrderPartBuilder pricePerPart(final double px) {
if (px < 0d) { if (px < 0d) {
throw (new IllegalArgumentException("px < zero")); throw (new IllegalArgumentException("px < zero"));
} }
mTotalPrice = BigDecimal.valueOf(px); mPrice = BigDecimal.valueOf(px);
return (this); return (this);
} }
// Checks if this builder contains a valid message. // Checks if this builder contains a valid message.
// This method is called by EMessageObject.Builder.build() // This method is called by EMessageObject.Builder.build()
// If there are no problems, then Builder.buildImpl() // If there are no problems, then Builder.buildImpl()
// is called which generates the target message; // is called which generates the target message;
// otherwise a net.sf.eBus.messages.ValidationException is // otherwise a net.sf.eBus.messages.ValidationException is
// thrown. // thrown.
// See EMessageObject
for a
// detailed message validation example.
@Override @Override
protected void validate(final Validator probs) { protected void validate(final Validator probs) {
return (super.validate(probs) return (super.validate(probs)
.requireNotNull(mParts, "parts") .requireNotNull(mPartId, "partId")
.requireNotNull(mShipping, "shippingCost") .requireTrue(v -> (mQuantity > 0), "quantity", Validator.NOT_SET)
.requireNotNull(mTotalPrice, "totalPrice") .requireNotNull(mPrice, "pricePerPart")
} }
// Builder contains valid message. Time to create the target // Builder contains valid message. Time to create the target
// message instance. // message instance.
@Override protected OrderReply buildImpl() @Override protected OrderPart buildImpl()
{ {
return (new OrderReply(this)); return (new OrderPart(this));
} }
} }
} }
Request messages extend
ERequestMessage
and include a
EReplyInfo
annotation
which declares which messages may be sent in reply to the
request. EReplyInfo
is accumulative. The allowed
reply messages include those listed in the current request
message and its super classes. Because
ERequestMessage
has a EReplyInfo
which
includes EReplyMessage
. This means that
EReplyMessage
may be sent in reply to all request
messages because all request messages ultimately extend
ERequestMessage
.
(eBus release 5.6.0 introduced a new EReplyInfo
attribute mayClose
. To learn more about how this
attribute works see
Canceling an Active Request)
import net.sf.eBus.messages.EReplyInfo;
import net.sf.eBus.messages.EReplyMessage;
import net.sf.eBus.messages.ERequestMessage
@EReplyInfo (replyMessageClasses = {OrderReply.class}, mayClose = false)
public final class OrderRequest
extends ERequestMessage
implements Serializable
{
public final OrderPart[] parts;
private static final long serializeVersionUID = 0x1L;
private OrderRequest(final OrderRequestBuilder builder)
{
super (builder);
this.parts = builder.mParts;
}
public static OrderRequestBuilder builder() {
return (new OrderRequestBuilder());
}
public static final class OrderRequestBuilder
extends ERequestMessage.Builder<OrderRequest, OrderRequestBuilder>
{
private OrderPart mParts;
private OrderRequestBuilder() {
super (OrderRequest.class);
}
public OrderRequestBuilder parts(final OrderPart[] parts) {
if (parts == null || parts.length == 0) {
throw (new IllegalArgumentException("parts is null or empty"));
}
mParts = parts;
return (this);
}
@Override protected OrderRequest buildImpl() {
return (new OrderRequest(this));
}
@Override protected void validate(final Validator problems) {
return (super.validate(problems)
.requireNotNull(mParts, "parts"));
}
}
}
The messages which may be sent in reply to an
OrderRequest
are ERequestMessage
and
OrderReply
. If
EReplyFeed.ERequest.reply(EReplyMessage)
is passed a message that is not one of the two supported
reply message types, then an
IllegalArgumentException
is thrown in response.
Local Message
At the start of this section, it was stated that message
fields must be one of the
supported eBus data types. This
is not completely true. It is only necessary for messages
which are intended for transmission to another eBus
application. But if the message is intended for use
within the local JVM only, then any valid Java type may
be used. Such messages are marked by the
@ELocalOnly
class-level annotation.
Local-only messages do not define public static
builder
method, public static inner builder
class, and do not need to define a builder-accessible
constructor. But the message fields must still be
public final
. The following code shows how to
implement a local-only eBus message.
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import net.sf.eBus.messages.ELocalOnly;
import net.sf.eBus.message.ENotificationMessage;
@ELocalOnly public final class LocalMessage
extends ENotificationMessage
implements Serializable
{
// Map is not an eBus-supported type but allowed in this local message
public final Map<String, Integer> studentIds;
private static final long serializeVersionUID = 0x1L;
private LocalMessage(final Builder builder) {
super (builder);
studentIds = Collections.unmodifiableMap(builder.mStudentIds);
}
public static Builder builder() {
return (new Builder());
}
public static final class Builder
extends ENotificationMessage.Builder<LocalMessage, Builder>
{
private final Map<String, Integer> mStudentIds;
private Builder() {
super (LocalMessage.class);
mStudentIds = new HashMap<>l;();
}
public Builder studentIds(final Map<String, Integer> ids) {
mStudentIds.putAll(ids);
return (this);
}
public Builder addStudentId(final String name, final int id) {
mStudentIds.put(name, id);
return (this);
}
public Validator validate(final Validator problems) {
return (super.validate(problems)
.requireTrue(v -> (!v.isEmpty()), mStudentIds, "studentIds", Validator.NOT_SET));
}
public LocalMessage buildImpl() {
return (new LocalMessage(this));
}
}
}
Message Field Annotations
The following field-level annotations may be applied to message fields of the specified type.
String: @EStringInfo
eBus uses java.nio.charset.StandardCharsets.UTF_8
as the default character set for serializing/de-serializing
Java java.lang.String
fields. This default
character set can be overridden using the
@EStringInfo
annotation for
String
message fields:
@EStringInfo
has a second attribute
lineCount
used to specify the number of lines that
may be in the string field. If not defined, the line count
defaults to a single line. lineCount
is not
used by eBus and is provided solely for application use.
One possible use for this attribute is deciding whether to
use a JavaFX TextField
to input single line text or
TextArea
for multi-line text.
@EStringInfo(charset="latin1") public final String lastName;
where charset
should be a character set name or
alias known to java.nio.charset.Charset.forName
method. If forName
throws an exception for the
given charset
name, then eBus will quietly
use UTF_8
instead. No exception will be thrown or
error logged.
All: @FieldDisplayIndex
eBus 5.3.2 introduced the annotation
@FieldDisplayIndex
which is used to define
field ordering for display purposes. The problem
is that
MessageType.fields()
returns fields in serialization order. This
ordering is meant for superior serialization performance
but this ordering does not make human sense. This is
corrected by
MessageType.displayFields()
.
The returned field ordering is from base class
EMessageObject
fields down to
the leaf class. Meaning super class fields are listed
first, the leaf message class last.
Usage: @FieldDisplayIndex(index = n)
where n
≥ zero. Field display indices do not
have to start at zero or have to be strictly sequential but
must be in increasing order. If multiple field display
indices have the same value, the ordering is undefined
among those fields.
Message Keys
eBus subjects
are
identified by a
message key
. This
key consists of a message class and String
subject. This is also known as type+topic
referencing.
An example message key is:
(com.acme.ProductUpdate.class
, )
where com.acme.ProductUpdate
is a class extending
EMessage
.
There is no eBus-defined subject format. Message subject format (if any) is left entirely up to the application developer. The above example could be:
- (
com.acme.ProductUpdate.class
, ) - (
com.acme.ProductUpdate.class
, ) - (
com.acme.ProductUpdate.class
, )
Message Key Dictionary
eBus release 4.5.0 allows applications to add message keys to and retrieve keys from the message key dictionary. Prior to release 4.5.0, message keys were added to the dictionary indirectly by opening publisher, subscriber, and reply feeds. Even then, this dictionary was not accessible' to applications.
Notification and request message keys are added using the
methods
EFeed.addKey(EMessageKey)
and
EFeed.addAllKeys(Collection<EMessageKey>)
.
EFeed.addKey
puts a single message key into the
dictionary while EFeed.addAllKeys
adds multiple
keys to the dictionary. Note that both calls do not replace
or overwrite keys that are already in the dictionary. If
EFeed.addKey
is called with a message key that is
already in the dictionary, then the method does not update
the existing message key entry. Attempting to add reply or
system message keys results in an
IllegalArgumentException
because eBus does not
store such keys in the message key dictionary.
Message keys are retrieved from the dictionary using the
methods
EFeed.findKeys(Class<? extends EMessage>
and
EFeed.findKeys(Class<? extends EMessage>, Pattern)
.
The first call returns the all message keys associated with
the given message class. The second call extends that to
all keys for the message class and whose subject matches
the pattern. Both methods return a non-null list. If the
message class is either SystemMessage
or
ReplyMessage
subclass, then an empty list is
returned.
eBus provides the ability to store and load message key
dictionaries with the methods
EFeed.storeKeys(ObjectOutputStreawm)
,
EFeed.storeKeys(Class<? extends EMessage>, ObjectOutputStream)
,
and
EFeed.storeKeys(Class<? extends EMessage>, Pattern, ObjectOutputStream)
.
The first method stores the entire message key dictionary
to the object output stream. The next two, like
EFeed.findKeys
, stores only those keys which
match the message class and (optional) subject pattern.
The stored message key stream is re-loaded with
EFeed.loadKeys(ObjectInputStream)
.
Like EFeed.addAllKeys
, this method does not replace
or overwrite existing message keys.
The purpose for the storeKeys
and loadKeys
methods is for applications to re-create the message key
dictionary quickly upon start up. And the importance of
this ability is to support
multi-subject feeds also
introduced in eBus release 4.5.0. Multi-subject feeds allow
an eBus client to open multiple notification or reply feeds
with a single feed.
Notification
Notification messages follow the advertise/subscribe
paradigm together with type+topic-based routing.
A notification feed is referenced by
message key
. A
publisher opens a
publish feed
for
a given message key and then
advertises
the capability of sending a notification message.
When a publisher is capable of publishing the advertised
notification message, it calls
update feed state
with an
up feed state
.
A subscriber informs eBus of interest in a notification
message feed by opening a
subscribe feed
for the same message key and then
subscribes
to the notification feed.
eBus subject
sits
in the middle, tracking advertisements and subscriptions
for each unique message key and deciding when the
publishers should start and stop publishing based on the
subscriber presence. The following table defines the states
between publishers and subscribers:
0 Publishers | > 0 Publishers | |
---|---|---|
0 Subscribers | No Feed | Stop publisher feed |
> 0 Subscribers | Inform subscribers there are no publishers | Start publisher feed |
eBus uses advertisements and subscriptions to track when to:
- Start a feed because there is a subscriber wanting to receive the notification.
- Stop a feed because there are no more subscribers.
- Inform subscribers when a notification feed is up and to expect notifications or the feed is down and so there will be no notifications forthcoming.
Publishers and subscribers may retract their advertisements
and subscriptions, respectively, at any time by calling
EPublishFeed.unadvertise()
and
ESubscribeFeed.unsubscribe()
for
a particular advertisement or subscription. Both publish
and subscribe feeds are still alive and may be
re-advertised and re-subscribed. When an application now
longer needs a feed, it should call
EFeed.close()
which permanently
disposes the feed. The application should no longer
reference a feed after closing it.
If a publisher experiences a problem preventing updates to
a specific message key feed, the publisher can inform
subscribers that the feed is down by calling
feed.updateFeedState(EFeedState.DOWN)
.
When the problem clears, the publisher calls
feed.updateFeedState(EFeedState.UP)
, informing
the subscribers that the feed is back up.
Note: a subscriber will see a
EFeedState.DOWN
feed state until a publisher both
advertises the feed and calls
feed.updateFeedState(EFeedState.UP)
. eBus does not
assume that a publisher can publish notifications simply
because it advertised the feed.
ENotification Fields
eBus release 5.6.0 introduced two optional fields to the
ENotificationMessage
class:
publisherId
and
position
.
publisherId
may be used to uniquely identifier the
EPublisher
instance responsible for generating a
particular notification instance. It is the application's
responsibility to define the meaning and uniqueness of
publisher identifiers.
Position is used to define message ordering when
notification messages are published at a rate faster than
wallclock time granularity. In other words, there is more
than one message per timestamp. If these message are
persisted then message ordering may be lost upon
retrieval. The position
field may be used an an
index within a given timestamp to guarantee correct
ordering when retrieved from persistent store.
Combining subject
, timestamp
,
publisherId
, and position
guarantees a
unique index for any notification message placed into
persistent store if publisher identifier and
message position are correctly used.
Request, Reply
eBus uses the advertise/request/reply paradigm to forward request messages to advertised repliers. Replies are sent directly back to the requestor. A replier advertises the ability to reply to a request by:
-
building
a reply feed and -
advertising
the feed. -
replyFeed(EFeedState.UP) marking
the feed as available.
type+topic message key
.
A replier can
unadvertise
and re-advertise any number of times. But once the feed is
closed
, it may not
be used again. Instead, a new feed must be opened.
A requester sends a request message by:
-
building
a request feed and -
posting
the request message.request(msg)
method returns the resulting request state. If there are no matching repliers for the request message, then anIllegalStateException
is thrown. Otherwise, returns anERequestFeed.ERequest
instance which the requestor may use to cancel the request by callingERequestFeed.ERequest.close()
.
The replier receives the request via the method
EReplier.request(request)
.
The replier sends a reply message via request
by
calling
ERequestFeed.ERequest.reply(msg)
.
The replier may send multiple replies for the same request
by setting the reply status to
EReplyMessage.ReplyStatus.OK_CONTINUING
for all but the final reply. The last reply has the status
EReplyMessage.ReplyStatus.OK_FINAL
.
If the request cannot be successfully handled, then the
EReplyMessage.ReplyStatus.ERROR
reply status should be sent. An error reply is also a final
reply and no further replies may be sent afterwards. Also,
an error reply may be sent even after
ReplyStatus.OK_CONTINUING
replies but not
after ReplyStatus.OK_FINAL
.
Replies are sent back to the requestor via the method
ERequestor.reply(remaining, message, request)
where the first argument, remaining
, specifies the
number of repliers which have not yet finished replying to
the request. When remaining
is zero, then this is
the final reply and the request is finished. The request
feed state is retrieved by calling
ERequestFeed.ERequest.requestState()
.
The requestor may cancel a request any time prior to request completion. For a detailed explanation regarding request cancelation see Canceling an Active Request.
Request message definitions have an annotation:
@EReplyInfo
. This
class-level annotation lists one or more reply message
classes which may be sent in response to the request
message. @EReplyInfo
annotations are cumulative.
This means that the list of allowed reply message classes
includes those in this request message @EReplyInfo
class list and those in the super class @EReplyInfo
class list. Since EReplyMessage
is in
ERequestMessage
@EReplyInfo
annotation, and
because ERequestMessage
is the base class for all
request message classes, EReplyMessage
is a valid
reply message for all requests.
In the following example, OrderReply
and
EReplyMessage
messages may be sent in reply to an
OrderRequest
but not an Invoice
reply
message.
import net.sf.eBus.messages.EReplyInfo;
import net.sf.eBus.message.ERequestMessage;
@EReplyInfo (replyMessageClasses = {OrderReply.class}, mayClose = false)
public final class OrderRequest
extends ERequestMessage
Canceling an Active Request
eBus release 5.6.0 introduced a new EReplyInfo
optional attribute: mayClose
. If true
(which is the default setting) then a request may call
ERequestFeed.ERequest.close()
which immediately and
completely closes the request which repliers can do nothing
about. In the above example OrderRequest
cannot
be unilaterally closed. So how then may an
OrderRequest
be canceled?
Release 5.6.0 introduced a new method:
ERequestFeed.ERequest.cancel()
which is an
optional cancel request. A new parameter
boolean mayRespond
was added to
EReplier.cancelRequest
method. If this flag is
set to true
the replier may respond to the
cancel request and either accept or reject the
cancellation. If set to false
this is a unilateral
cancellation and the replier may not respond.
If the replier may respond, that response is generally an
EReplyMessage
with the reply status set to either:
-
EReplyMessage.ReplyStatus.CANCELED
: replier accepts the cancel and the request is now terminated (for the replier) or -
EReplyMessage.ReplyStatus.CANCEL_REJECT
: replier rejects the cancellation and is still working the request. More replies may be expected from this replier.EReplyMessage.replyReason
should contain text explaining why the cancel request is rejected.
While a reply is not required if the cancellation is
rejected it is a good idea to send back a
ReplyStatus.CANCEL_REJECT
to let the requester
know that this is the case.
Please note that whether optionally cancel()
ing or
unilaterally close()
ing a request, the requester
may still receive replies while the cancellation process
completes.
3
eBus Roles
Objects need to implement one or more of the client interfaces in order to interact with eBus:
Role | Message |
---|---|
Subscriber | Receive Notification |
Publisher | Send Notification |
Requester |
Send Request
Receive Reply |
Replier |
Receive Request
Send Reply |
RequestMonitor | Receive requests, replies, and cancellations sent to and from local repliers. |
Which interface(s) should be implemented depends on which eBus message types the object wants to send or receive. An application class may implement one or all four eBus interfaces.
Subscriber
The subscriber
interface is implemented so a client may receive
notification messages. The
subscriber life cycle consists of:
-
Opening a feed: call
ESubscribeFeed.builder()
obtaining a subscriber builder instance. Configure the subscribe feed via theBuilder
methodstarget
,messageKey
,scope
,condition
,statusCallback
, andnotifyCallback
. The subscription condition is optional and may benull
. If provided, then only those notifications matching the condition are forwarded to the subscriber. IfstatusCallback
andnotifyCallback
are not defined, then theESubscriber
method overrides are used.Subscribe feed instance is created by calling
Builder.build()
method. -
Subscribing to a subject: call
ESubscribeFeed.subscribe()
to start receiving the messages from the notification feed. -
Receive feed state updates: eBus informs the
subscriber of the subject's feed status via the
feedStatus(feedState, feed)
callback. If there are no publishers, thenfeedState
isdown
. Otherwise,feedState
isup
and the subscriber can expect notifications from the feed. This method may be replaced usingESubscriber.Builder.statusCallback
method. -
Receive notification messages: eBus passes the
latest subject notification to a subscriber via the
notify(message, feed)
callback. This method may be replaced usingESubscriber.Builder.notifyCallback
method. -
Unsubscribe from subject: call
ESubscribeFeed.unsubscribe()
. Once unsubscribed, the subscriber can re-subscribe by callingESubscribeFeed.subscribe()
again. -
Closing a feed: call
EFeed.close()
. If the subscription is in place when the subscribe feed is closed, the subscription is automatically retracted. Once a feed is closed, it may not be used again and should be dropped by the application.
Publisher
The publisher
interface is implemented so a client may publish
notification messages. The
publisher life cycle is:
-
Opening a feed: call
EPublishFeed.builder()
obtaining a publisher builder instance. Configure the feed target, message key, and scope. You can also override the default status callback method in the builder. Publish feed instance is created by callingBuilder.build()
method. -
Advertising a subject: call
EPublishFeed.advertise()
to register the ability to publish the given message key. -
Update the feed state: call
EPublishFeed.updateFeedState(EFeedState)
to inform eBus of the publisher's ability to publish notifications on this feed. If the publisher is capable of publishing this notification, pass inup
; otherwise pass indown
. -
Waiting for a publish status update: when the
first subscriber for the notification feed arrives, eBus
calls
publishStatus(feedState, feed)
method withEFeedState.UP
. The publisher is now clear to start sending notification messages on the feed. When there are no more subscribers for the feed,publishStatus
is called with aEFeedState.DOWN
. The publisher should now stop posting notification messages to the feed. This mechanism results in notifications being posted to the feed only when there are subscribers registered to receive the message. -
Publishing notifications: notification messages
are sent via
EPublishFeed.publish(notification)
. -
Unadvertise when done: call
ESubject.unadvertise()
to retract the advertisement. Once unadvertised, the publisher may re-advertise the notification feed. -
Closing a feed: call
EFeed.close()
. If the advertisement is in place when the publish feed is closed, the advertisement is automatically retracted. Once a feed is closed, it may not be used again and should be dropped by the application.
Requestor
The requestor
interface is implemented so a client may send requests to
matching repliers
. The
requestor life cyle is:
-
Opening a feed: call
ERequestFeed.builder()
obtaining a request feed builder instance. Use this builder to set the requestor, message key, and feed scope. The request feed instance is created by callingBuilder.build()
method. -
Subscribing to a feed: call
subscribe()
. Once subscribed, wait untilfeedStatus(EFeedState.UP, feed)
is called, signifying that there is a replier for the request feed. -
Post a request: call
request(message)
. This method returns the resultingERequestFeed.RequestState
. IfACTIVE
is returned, then the request has repliers and the requestor will receive replies to the request. IfDONE
is returned, then there are no repliers to this request and the requestor will receive no replies. -
Receive replies: eBus sends replies back to the
requestor via the
reply(remaining, reply, request)
callback wherereply
is the latest reply message andremaining
parameter specifies the number of repliers still sending replies. Whenremaining
is zero, then this represents the final reply. - Cancel a request: see Canceling an Active Request for a detailed explanation regarding request cancellation.
-
Unsubscribe: call
subscribe()
when finished posting requests to this feed.
ERequestFeed
may be used
only once. A new ERequestFeed
instance must be
opened for each new request.
Replier
The replier
interface
is implemented so a client may respond to requests. The
replier life cycle is:
-
Opening a feed: call
EReplyFeed.builder()
obtaining a reply feed builder instance. Calling builder set methods configures the reply feed. Once configured callBuilder.build()
to create the reply feed instance. -
Advertise a subject: call
advertise()
to register the ability to reply to the given request message key. -
Setting feed status: call
updateFeedState(EFeedState.UP)
to inform requestors that this replier is ready to handle requests. If a replier is temporarily prevented from responding to requests, then set the feed state toEFeedState.DOWN
. When the issue is resolved, then mark the feed stateUP
. -
Wait for requests: requests are passed to this
replier via the
EReplier.request(request)
callback. The replier does not have to reply immediately but if replying asynchronously, thenrequest
must be stored away for later use sincerequest
is used to send replies. TheEReplyFeed
may have receive multiple requests over time but an individual request is contained in anERequest
instance. -
Respond to requests: a replier sends a reply
message via
reply(msg)
. -
Handle canceled requests: see
Canceling an Active Request
for detailed explanation about how to handle a call to
EReplier.cancelRequest(request, mayRespond)
. -
Unadvertise when done: call
EReplyFeed.unadvertise()
. Once un-advertised, a reply feed may be advertised again. -
Closing a feed: call
EFeed.close()
. If the advertisement is in place when the reply feed is closed, the advertisement is automatically retracted. Once a feed is closed, it may not be used again and should be dropped by the application.
Request Monitor
Request monitor
interface is implemented to monitor local repliers
are processing requests in a timely fashion. The request
monitor cycle is:
-
Opening a feed: call
ERequestMonitorFeed.builder()
obtaining a request monitor feed builder instance. Calling builder set methods configures request monitor feed. Once configured callBuilder.build()
to create request monitor feed instance. -
Monitor a request subject: call
ERequestMonitorFeed.monitor()
to begin monitoring the configured request messages posted to local repliers and replies from those repliers back to the requesters. -
Wait for requests, replies, and cancellations:
request messages, reply messages, and cancellations are
passed back to
ERequestMonitor
interface methods.
Please note that unlike the other four roles there is no feed status callback. This is because a request monitor is not a part of a request/reply conversation but standing off to the side watching the conversation. -
Unmonitor when done: call
ERequestMonitorFeed.unmonitor()
. Once monitoring is stopped, a request monitor may start monitoring again on the feed. -
Closing a feed: call
EFeed.close()
. If monitoring is in place when the request monitor feed is closed, the monitoring is automatically stopped. Once a feed is closed, it may not be used again and should be dropped by the application.
Lambda Expressions Callbacks
By default, eBus passes messages to application objects by
calling the appropriate role interface method, like
ESubscriber.notify(ENotificationMessage, ESubscribeFeed)
.
This is fine if a subscriber opens only a single feed. But
if the subscriber opens multiple feeds, each feed's
messages are posted to ESubscriber.notify
. The
notify
method then becomes a switch, untangling the
inbound notifications and routing each message to its
destination method where the real work is performed.
In this case, ESubscriber.notify
is pure overhead.
eBus v. 4.2.0 fixes this problem by allowing applications
to associate callback code with a feed. Application objects
are still required to
implement role interface
but do not
have to override the role interface methods. Instead, the
application registers callback code using Java lambda
expressions. The steps for putting a notification
subscription in place is now:
import net.sf.eBus.client.ECondition;
import static net.sf.eBus.client.EFeed.FeedScope;
import net.sf.eBus.client.EFeedState;
import net.sf.eBus.client.ESubscribeFeed;
import net.sf.eBus.client.ESubscriber;
import net.sf.eBus.client.FeedStatusCallback;
import net.sf.eBus.client.NotifyCallback;
public class MyAppClass implements ESubscriber {
// ESubscriber interface methods are not overridden.
private ESubscribeFeed mFeed = null;
public void startup() {
EMessageKey messageKey = new EMessageKey(CatalogUpdate.class, "Spring"); // Receive updates to Spring catalog.
// The subscription condition may now be defined using a lambda expression.
// Only updates to the catalog camping section are accepted.
mFeed = (ESubscribeFeed.builder()).target(this)
.messageKey(messageKey)
.scope(FeedScope.REMOTE_ONLY)
.condition(m -> ((CatalogUpdate) m).section == CatalogSection.CAMPING))
.statusCallback((feedState, feed) -> {
if (feedState == EFeedState.DOWN) {
// Handle lost feed.
}
else {
// Handle gained feed.
}
})
.notifyCallback((msg, feed) -> {
// Process latest message.
})
.build();
feed.subscribe();
}
public void shutdown() {
if (mFeed != null) {
mFeed.close();
mFeed = null;
}
}
}
As of eBus release 7.1.0, notify callback may be declared
using the target notification message type rather than
ENotificationMessage
. Please note that the method
may be private unlike overriding
notification(ENotificationMessage, IESubscribeFeed)
.
private void onUpdate(final CatalogUpdate msg, final IESubscribeFeed feed) { ... }
Failure to override the role interface methods
and put the role callbacks in place
results in eBus throwing an ValidationException
when building the feed.
Hybrid Object Pattern
When using eBus in a major application the central eBus objects will contain so many feeds that the object becomes convoluted and difficult to understand. As an example consider an eBus object implementing a trading algorithm. This object implements the following roles:
-
EReplier
: receives algo requests to buy or sell an instrument using the algorithm's logic. -
ERequestor
: algorithm places order on one or more exchanges in order to satisfy its own request. -
ESubscriber
: algorithm subscribes to market data, dynamic configuration changes to algorithm parameters, instrument and exchange dynamic performance statistics used to guide placing exchange orders. -
EPublisher
: algorithm publishes its own dynamic performance statistics.
Placing all these feeds and associated data members into a single eBus object results in code confusion, making it difficult to separate the essential algorithm data and logic from subsidiary information supporting the algorithm. This section describes a hybrid object pattern useful for reducing this code confusion.
Firstly, what is a hybrid object? There are two types of
classes: active and passive. A passive object does not
initiate actions but is only acted upon. Example passive
classes are java.util.ArrayList
or
java.time.Duration
. Instances of these classes do
nothing until other code initiates a method call to the
instance. A passive object is contained within an active
object.
An active object can initiate action by sending an eBus message. An active object does not need to wait to be acted upon before acting. An active object does not exist within another object, standing alone within an application.
A hybrid object lies between active and passive. A hybrid object sends and/or receives messages but is contained within an active object. This active object treats the hybrid object as a passive object, interacting with the hybrid object using method calls. Please note that hybrid objects should not be shared among active objects.
Back to the trading algorithm active object. The following example concentrates on the market data hybrid object only but can be readily expanded to handle the feeds as well.
public final class MakeMoneyAlgo
implements EReplier, ERequestor, ESubscriber, EPublisher {
// Hybrid objects created on active object start up.
private MarketData mMktData;
private MakeMoneyConfig mAlgoConfig;
...
// Receives order requests on this
private EReplyFeed mOrderFeed;
public MakeMoneyAlgo(...) { ... }
@Override public void startup() {
// Create hybrid instances and then have hybrid create its feeds.
// Note that this MakeMoneyAlgo reference is passed to hybrid object
// constructor.
mMktData = new MarketData(this, ...);
mMktData.startup();
mOrderFeed = (EReplyFeed.builder()).target(this)
.messageKey(order request key)
.scope(EFeed.FeedScope.REMOTE)
.requestCallback(this::onOrder)
.build();
mOrderFeed.advertise();
mOrderFeed.updateFeedState(EFeedState.UP);
}
private void onOrder(final EReplyFeed.ERequest request,
final EReplyFeed feed) {
final OrderRequest newOrder = (OrderRequest) request;
final OrderBook book =
mMktData.getOrderBook(newOrder.instrument, mAlgoConfig.getBookDepth());
// Validate and process order request using information retrieved from hybrid
// objects.
}
}
Note that MakeMoneyAlgo
implements the
EReplier
, ERequestor
, ESubscriber
,
and EPublisher
interfaces and not the
hybrid objects. This is because only active
objects may implement eBus role interfaces. This includes
the EObject
interface.
Passing the MakeMoneyAlgo
reference to the
MarketData
constructor is key to any hybrid object:
public final class MarketData {
// MakeMoneyAlgo instance containing this hybrid object.
private final MakeMoneyAlgo mOwner;
// Market data subscription made on start up.
private ESubscribeFeed mMktDataFeed;
// Place order book data members here.
public MarketData(final MakeMoneyAlgo owner, ...) {
mOwner = owner;
...
}
public void startup() {
// Note: hybrid feed uses MakeMoneyAlgo reference as target but uses
// MarketData methods for feed status and message delivery.
// This is what makes MarketData a hybrid object.
mMktDataFeed = (ESubscribeFeed.builder()).target(mOwner)
.messageKey(market data message key)
.scope(EFeed.FeedScope.REMOTE)
.statusCallback(this::onFeedStatus)
.notifyCallback(this::onMarketData)
.build();
mMktDataFeed.subscribe();
}
public OrderBook getOrderBook(final Instrument instrument, final int depth) {
// Returns instrument's order book up to the given depth. MakeMoneyAlgo uses
// this order book combined with other parameters to decide what orders to
// place on what exchanges.
}
private void onFeedStatus(final EFeedState state, final IESubscribeFeed feed) {
// Place feed status update code here especially when market data feed goes
// down.
}
private void onMarketData(final ENotificationMessage msg, final IESubscribeFeed feed) {
// Update market data members with latest update.
}
}
See Dispatcher section for why hybrid object feeds must target its active object owner.
eBus Feeds
Simple Feeds
Simple feeds 1) contain a single message key and 2) connect subscribers/requestors to all the simple publisher/replier feeds that exist for the same message key. What a publisher posts to simple publish feed is forwarded to all existing and subscribed simple subscription feeds. Similarly, requests posted to a simple request feed are forwarded to all existing and advertised simple reply feeds. Messages are exchanged between simple feeds only when both sides are open and in place (advertised or subscribed).
The above roles section showed
application classes interacting with simple eBus feeds:
EPublishFeed
,
ESubscribeFeed
,
ERequestFeed
, and
EReplyFeed
.
Builder
class
for EPublishFeed
, ESubscribeFeed
,
ERequestFeed
, and EReplyFeed
feed classes.
Feed builders are now the preferred technique for creating
single feed instances. The open
methods are now
deprecated and will be removed in a future release.
Multi-Subject Feeds
Multi-Subject feeds act as a proxy between the application object and multiple, subordinate simple feeds. The multi-subject feed opens, configures, advertises/subscribes, and closes these subordinate feeds in unison. The multi-subject feed makes sure the subordinate feeds are in the same state at the same time.
But it is the subordinate simple feeds which interact with the application object. The multi-subject feed opens a subordinate simple feed, passing in the application object as the feed client. That means the subordinate feed calls back to the application object. If the application object creates a multi-subject feed with 1,000 subordinate feeds, then the application object receives callbacks from 1,000 subordinate feeds.
Multi-subject feeds behave in a similar manner to simple feeds. They are opened, may have callbacks configured, advertised/subscribed, un-advertised/un-subscribed, and closed. Note the multi-subject feed configures the callback methods for the subordinate feeds based on how the multi-subject feed is configured. This means that each subordinate feed calls back to the same method. It is not possible for subordinate feeds belonging to the same multi-subject feed to call back to different methods.
Multi-subject feeds use the same role interfaces as simple feeds:
Multi-Subject Feed | Role |
---|---|
Multi-Subject Publish Feed |
EPublisher |
Multi-Subject Subscribe Feed |
ESubscriber |
Multi-Subject Reply Feed |
EReplier |
Multi-Subject Request Feed |
ERequestor |
New subordinate feeds may be
added to
or
removed from
an open multi-subject feed by specifying the message
subject to be added or removed. The newly added subordinate
feed references the same message class as the other feeds.
Multi-subject feeds are homogenous with respect to message
class but heterogenous with respect to message
subject. A
subscriber
opens and
subscribes a
multi-subject subscribe feed
,
all messages received from the subordincate feeds will be
the same message class. It is not possible to open a
multi-subject feed for more than one message class.
Note that
multi-subject feeds
are not eBus feeds
. This
is because multi-subject feeds are proxy feeds, posing as
an interface between application objects and eBus feeds.
Pattern Feeds
eBus release 4.6.0 introduced pattern feeds. A pattern feed
watches one or more notification feeds and reports a
MatchEvent
when events arriving
on those feeds match the given pattern. The best way to
explain how this works is by example.
A stock market algorithm reacts to the following trade pattern in stock ABCD:
- The pattern starts with a trade quantity > 1,000 shares.
- All subsequent trades must be at a price > than the average price of all previous trades. So the price is trending up. The quantity may increase or slowly decrease.
- The pattern ends with a trade quantity < 0.8 * the previous trade quantity. So a drop in traded quantity marks the end of the trading spurt.
- The time difference between the first and final trade must be ≤ 1 hour.
- Events which are used in one match may not be used in another match. In other words, there is no intersection between event sets for any two different matches from the same pattern. Exclusivity does not hold across patterns.
An eBus pattern is created using a
EventPattern.Builder
. A pattern
consists of two parts: 1) notification feed(s) (known as
parameters) and 2) the pattern. Parameters can either be
built or provided to the builder as a java.util.Map
.
This example builds the parameter. See
EventPattern.Builder
for an example of a parameter
Map
.
Event patterns come in two flavors: ordered and unordered.
This example is ordered since events must arrive in a
specified order. Again, see
EventPattern
for an example of an
unordered pattern.
An event pattern builder is created as follows:
EventPattern.Builder builder = EventPattern.builder("/stock/pattern/TradeBlip/ABCD", EventPattern.PatternType.ORDERED);
where the first argument "/stock/pattern/TradeBlip/ABCD" is the
pattern name. This name is used as the MatchEvent
subject. Duplicate pattern names are allowed. The second
argument tells the builder to build an ordered pattern. The
optional third argument is the parameter map.
The first step is parameter map definition. Since the
pattern uses only equity trade notifications, there is one
parameter. The parameter definition is the information
needed to
build a notification subscription feed
.
final String param = "t";
builder.beginParameterMap() // start parameter map definition.
.beginParameter(param) // define parameter "t"
.messageKey(new EMessageKey(EquityTradeMessage.class, "/stock/trade/ABCD"))
.scope(EFeed.FeedScope.REMOTE_ONLY)
.endParameter() // parameter "t" defined.
.endParameterMap(); // all parameters defined.
With the parameter map defined, it is time to define the
pattern components. The first component is a trade with a
quantity > 1,000. The match condition takes two
parameters: the latest event (t
) and the current
groups map (u
).
Like Java regular expressions, matched elements may be
stored within a named group. These groups are stored in a
Map<String, List<ENotification>>
which maps the group name to its associated list of matched
events. A groups map always contains the group
EventPattern.ALL_EVENTS
. This
group contains all matched events.
builder.singleComponent(param)
.matchCondition((t, u) -> (((EquityTradeMessage) t).trade).size > 1_000)
.endSingleComponent();
The second component is for the trade price trending up and
the trade quantity staying at a high level. These trades
are captured in group "g2". Note that multiple
trades may satisfy this condition. By default the match
count is exactly one event. Method avgPx
is a
user-defined method calculating the average trade price
across all previously matched trade events.
builder.beginGroup("g2")
.singleComponent(param)
.matchCount(1, Integer.MAX_VALUE)
.matchCondition((t, u) -> ((((EquityTradeMessage) t).trade).price).compareTo(avgPx(u.get(EventPattern.ALL_EVENTS))) > 0)
.endSingleComponent()
.endGroup("g2");
The final component detects a 20% drop in trade quantity, marking the end of the upward moving trade blip.
builder.singleComponent(param)
.matchCondition((t, u) ->
{
final EquityTradeMessage currTrade = (EquityTradeMessage) t;
final List<ENotificationMessage> trades = u.get(EventPattern.ALL_EVENTS);
final EquityTradeMessage prevTrade = (EquityTradeMessage) trades.get(trades.size() - 1);
return ((currTrade.trade).size < (int) (0.8 * (prevTrade.trade).size));
})
.endSingleComponent();
The one hour time limit is enforced by the until
condition. Note that the until
condition does
not have to be time-based at all. It may be based on any
event property.
builder.until((t, e) ->
{
// An empty event list means that the time limit is not exceeded.
boolean retcode = t.isEmpty();
if (!retcode) {
final EquityTradeMessage first = (EquityTradeMessage) t.get(0);
final long duration = (e.timestamp - first.timestamp);
retcode = (duration ≤ COMPLEX_TIME_LIMIT);
}
return (retcode);
});
The following code declares that each pattern match has
exclusive use of its matching events. The default setting
is false
.
builder.isExclusive(true)
With the pattern defined, the EventPattern
is
constructed with the build
method.
final EventPattern pattern = builder.build();
A pattern feed is opened just like any other feed type:
final EventPattern patFeed = EPatternFeed.open(subscriber, pattern);
patFeed.subscribe();
where subscriber
is an ESubscriber
instance, just as in an ESubscribeFeed
. The feed
status and notification methods can be either the
standard ESubscriber.feedStatus
and
ESubscriber.notify
interface methods or
lambda expression callbacks set via
EPatternFeed.statusCallback
and
EPatternFeed.notifyCallback
methods.
A pattern feed state is up only when all the subordinate parameter feeds are up.
When events match a pattern, a
MatchEvent
is posted to the
subscriber just like any other event.
4
The match event in this example is posted to the message
key MatchEvent:/stock/pattern/TradeBlip/ABCD
.
A match event contains only one field: the group map. A
helper method
List<ENotificationMessage> group(String name)
is provided which returns the notification list associated
with the given group name. The group map and the
notification lists in that map are read-only.
In this example, the group map contains two entries:
EventPattern.ALL_EVENTS
and "g2". The "g2" group
contains the one or more trade events which occurred
between the first and last events.
Historic Feeds
eBus release 6.3.0 introduced historic feeds. An historic
feed allows subscribers to retrieve notification messages
published in the past and into the future in a seamless
manner. Historic publishers use a message store to persist
and retrieve notification messages. The
net.sf.eBus.feed.historic
package introduces the
following interfaces:
-
IEHistoricPubliser
: implement this class to both publish notification messages to active subscribers and persist those messages. -
IEHistoricSubscriber
: implement this class to receive both past and future notification messages in a seamless manner. -
IEMessageStore
: implement this message to provide notification message persistence and retrieval.
There are two historic feeds:
EHistoricPublishFeed
and
EHistoricSubscribeFeed
.
These feeds are used to publish/persist and receive
notification messages respectively. These feeds do
not implement the
IEPublishFeed
or
IESubscribeFeed
interfaces (see below). Instead, the historic feeds are
implemented as hybrid objects
and run on the same dispatcher
as their respective historic publisher/subscriber. The
historic publish feed not only publishes live notification
messages but also handles requests for historic messages.
Conversely, the historic subscribe feed subscribes to live
notification messages and requests past messages, combining
both historic and live messages as a single stream to the
historic subscriber.
eBus release 7.2.0 introduces two IEMessageStore
implmentations:
InMemoryMessageStore
and
SqlMessageStore
.
See the javadocs for each to learn how to use these message
stores with an EHistoricPublishFeed
.
See the net.sf.eBus.feed.historic
package docs and
the programmer's manual section entitled "Don't know must
about history" for a detailed explanation on using
historic feeds.
Feed Interfaces
Because multi-subject feeds does not extend EFeed
,
it is not possible to store single- and multi-subject feeds
in a common way - until now. eBus release 4.5.2 introduced the
following interfaces:
IEPublishFeed
,
IESubscribeFeed
,
IERequestFeed
, and
IEReplyFeed
. These
interfaces extend IEFeed
.
The publish, subscribe, request, and reply single- and
multi-subject feeds implement the matching feed interface.
These feed interfaces allow applications to store feed
references in a common way. If an eBus client opens a mix
of single- and multi-subject subscriptions, then the client
can store these feeds in a common
List<IESubscribeFeed>
, not forced to
handle single- and multi-subject feeds separately.
Dispatching Messages to Application Objects
eBus wraps application objects in a
weak-reference client
.
This client tracks the application object's extant
feeds
, undelivered
message queue, and the run queue associated with the
application class. The run queue ties the application
object to the eBus dispatcher.
An application can configure eBus with multiple run queues. Each run queue is associated with one or more application classes where a class may appear in only one run queue class set. In other words, the run queue class sets are non-intersecting. One run queue is designated as the default and has an empty class set. An application class which does not appear in any specific run queue class set is assigned to the default run queue.
Each one run queue has one or more dispatcher threads associated with it. The threads are given the same, user-configurable priority. Each thread polls for the next ready client on the run queue. When a client is acquired, the dispatcher thread posts queued messages to the application object (more detail on this below). The run queue is application-configured to poll in four ways:
- Blocking: the run queue uses a lock and condition, blocking on the condition until a client is posted on the run queue and the condition is signaled.
- Spinning: the run queue spins in a tight loop until a client is posted.
-
Spin+Park: the run queue alternates
between spinning and
parking
while waiting for a client posting. The application may configure the spin limit and nanosecond park time. -
Spin+Yield: the run queue alternates
between spinning and
yielding
while waiting for a client posting. The application may configure the spin limit.
Putting this all together, it means that application
objects are referenced by only one dispatcher thread at a
time, making eBus message delivery single-threaded. If an
application object is referenced solely by eBus
dispatcher threads, then that application object is
effectively single-threaded. That application object does
not need to used synchronized
,
java.util.concurrent.locks.Lock
, atomics, etc. to
protect critical sections because a single-threaded object
has no critical sections.
(Note: the above does not apply to application objects which may be referenced by non-dispatcher threads.)
The basic dispatcher thread run()
method is:
public void run() {
while (true) {
acquire next available client (blocking)
while (client has pending messages && client remaining quantum > 0) {
startTime = System.nanoTime();
deliver next message to client;
delta = (System.nanoTime() - startTime);
update client remaining auantum;
}
if (client remaining quantum ≤ 0) {
set client quantum to run queue's max quantum.
}
if (client has pending messages) {
append client to run queue.
}
}
}
Each run queue has a configurable maximum run quantum. Each
client is assigned this maximum run quantum when
instantiated. This quantum prevents a single client from
dominating a run queue which the client shares. Note that
the client quantum is not used to preempt client
message processing. The client quantum is checked after
notify(msg, feed)
(for example) returns. If
notify(msg, feed)
goes into an infinite loop, then
the dispatcher thread making the call is lost to all other
application objects using that run queue.
A eBus client has four run states:
- IDLE: the eBus client has no pending messages and is not posted to the run queue.
- READY: the eBus client has pending messages and is posted to the run queue.
- RUNNING: a dispatcher thread acquired the eBus client and is dispatching the client's pending messages to that client. This will continue until the client has no more pending messages or quantum.
- DEFUNCT: the underlying application object is finalized. Any and all extant client feeds are automatically retracted when this state is entered.
Active and Hybrid Objects
The reason why hybrid object feeds must target their parent active object should now be apparent: because the hybrid object shares its parent active object's Dispatcher. While the hybrid object's method process the message, that message is posted to the active object's queue. So the active object remains single threaded but subordinate message processing to its hybrid objects.
(See hybrid object section for more about active and hybrid objects.)
Starting and Stopping Application Objects
When an object opens feeds, there is a race condition because eBus may call the object back while that object is still opening feeds. Consider the following method which often occurs in eBus applications:
public void start() {
mSubFeed = ESubscribeFeed.open(this, subKey, FeedScope.LOCAL, null);
mSubFeed.subscribe();
// Race condition between feedStatus callback and opening, advertising publish feed.
mPubFeed = EPublishFeed.open(this, pubKey, FeedScope.LOCAL);
mPubFeed.advertise();
}
@Override public void feedStatus(EFeedState feedState, IESubscribeFeed feed) {
// mPubFeed may still be null or unadvertised causing the following line to fail.
mPubFeed.updateFeedState(feedState);
}
The problem is that the subscription feed status callback may
occur before the publish feed is opened and advertised. The
simplest solution is switch the subscribe and publish feed
opening. But what if the example opened more feeds with a
more complex relation between feeds. It may be difficult (if
not impossible) to open the feeds in the correct order.
The next solution is to synchronize
the
start
and feedStatus
methods.
That would prevent the eBus callback from interrupting the
object start. But that would add overhead to the
feedStatus
callback which is only needed for
the one-time object start.
So the true problem is: how to prevent eBus callbacks from interrupting an object start. eBus v. 4.3.1 offers a solution: have the object start occur on an eBus Dispatcher thread.
If the object is starting on an eBus Dispatcher thread,
then any new eBus callbacks will be delivered
after the object start returns. This is done by
changing start
to
@Override public void startup()
This method is executed on the eBus Dispatcher thread by:
-
Registering the object with eBus:
EFeed.register(object)
and then -
Having eBus start up the object:
EFeed.startup(object)
Configuring Dispatchers
Dispatchers are be set up when an eBus application starts
and only then. Dispatchers can only be
configured using a properties file and referencing that
file in the command line parameter
-Dnet.sf.eBus.config.jsonFile=<conf file path>
.
What follows is a sample properties file which defines two
Dispatchers: mdDispatcher
and
defaultDispatcher
. See the
eBus Programmer's Manual
section on Dispatchers for a detailed explanation of the
properties.
As of eBus release 6.0.0 Java properties are no longer supported with respect to eBus configuration. Only typesfafe JSON configuration files may be used.
// Two separate dispatchers are used: a spinning dispatcher for market data
// clients and a spin+park selector for all others.
dispatchers : [
{
name : mdDispatcher
numberThreads : 1
runQueueType : spinning
priority : 9
quantum : 10000ns
isDefault : false
classes : [ "com.acme.trading.MDHandler" ]
threadAffinity { // optional, selector thread core affinity
affinityType : CPU_ID // required, core selection type.
cpuId : 7 // required for CPU_ID affinity type
bind : true // optional, defaults to false
wholeCore : true // optional, defaults to false
}
},
{
name : defaultDispatcher
numberThreads : 8
runQueueType : "spin+park"
spinLimit : 2500000
parkTime : 1000ns
priority : 4
quantum : 100000ns
isDefault : true
}
]
eBus release 5.8.0 introduced the ability to create an
affinity between a thread and a particular CPU. This
affinity is created using the
OpenHFT Thread Affinity Library.
Please see the OpenHFT Thread Affinity documentation and
ThreadAffinityConfigure
for more information regarding using thread affinity.
Precision Timers
Event-driven applications use timers to trigger tasks at
a specific time. The problem is this means eBus objects
are executing on a timer thread (like
Timer
or
ScheduledExecutorService
) and
not on an eBus dispatcher thread - which means the eBus
object is no longer effectively single threaded.
The solution is for the timer task to dispatch another task to the eBus object's task queue. That re-directed task will be executed by an eBus dispatcher thread. The eBus object is still effectively single threaded.
eBus release 7.3.0 provides a new solution to this problem:
EScheduledExecutor
. The
scheduling methods are similar to
ScheduledExecutorService
but with one key
difference: the second argument is a
non-null EObject
:
-
EScheduledExecutor.schedule(Runnable task, EObject eobject, Duration delay)
-
EScheduledExecutor.scheduleAtFixedRate(Runnable task, EObject eobject, Duration initialDelay, Duration period)
-
EScheduledExecutor.scheduleWithFixedDelay(Runnable task, EObject eobject, Duration initialDelay, Duration delay)
When an EScheduledExecutor
timer expires, the timer
task is dispatched to the eBus object's task queue, and the
eBus object posted to the run queue (if necessary). The
redirection needed for an external Java timer disappears.
See EScheduledExecutor
for
detailed documentation on how to create an eBus scheduled
executor using
EConfigure.ScheduledExecutor
.
Like eBus dispatcher threads, eBus timers come in four
varieties: blocking, spinning, spin+park, and spin+yield.
eBus provides a default, blocking scheduled executor in
EClient.sCoreExecutor
. It is
strongly recommended that eBus objects use
EScheduledExecutor
for timed tasks since this
executor works within the eBus dispatcher framework.
Monitoring Dispatchers
eBus release 6.1.0 introduced access to dispatcher run-time
statistics by adding the static method
EClient.runTimeStats()
which
returns a EBusObjectInfo
list. A EBusObjectInfo
instance applies to a single
eBus EObject
and contains the following (all times
are in nanoseconds):
- eBus object name,
- eBus-assigned client identifier,
- minimum run time,
- maximum run time,
- total run time,
- number of times object was executed in Dispatcher,
- average run time,
- Dispatcher maximum quantum,
- number of times this eBus object exceeded Dispatcher maximum quantum, and
- current thread denial alarm condition.
Sample output of the Dispatcher run-time stats is:
ConnectionPublisher (id 0)
min run time: 1,364 nanos
max run time: 32,743,678 nanos
total run time: 34,189,949 nanos
run count: 4
avg run time: 8,547,487 nanos
dispatcher: general
max quantum: 100,000 nanos
quantum overruns: 3
MulticastConnectionPublisher (id 1)
min run time: 613 nanos
max run time: 751,792 nanos
total run time: 763,513 nanos
run count: 3
avg run time: 254,504 nanos
dispatcher: general
max quantum: 100,000 nanos
quantum overruns: 1
PingPong Main (id 2)
min run time: 10,541 nanos
max run time: 3,700,790 nanos
total run time: 3,711,331 nanos
run count: 2
avg run time: 1,855,665 nanos
dispatcher: general
max quantum: 100,000 nanos
quantum overruns: 1
Ping! Pong! Timer (id 3)
min run time: 1,260 nanos
max run time: 9,877,401 nanos
total run time: 10,195,402 nanos
run count: 5
avg run time: 2,039,080 nanos
dispatcher: general
max quantum: 100,000 nanos
quantum overruns: 2
Pinger (id 4)
min run time: 61 nanos
max run time: 33,913,494 nanos
total run time: 953,601,532 nanos
run count: 338,447
avg run time: 2,817 nanos
dispatcher: ping
max quantum: 100,000 nanos
quantum overruns: 179
Ponger (id 5)
min run time: 164 nanos
max run time: 4,439,180 nanos
total run time: 926,228,288 nanos
run count: 132,905
avg run time: 6,969 nanos
dispatcher: pong
max quantum: 100,000 nanos
quantum overruns: 235
Monitoring Run Queue Threads
eBus release 6.2.0 introduced the singleton class
RQMonitor
which monitors
Dispatcher run queue thread performance. This differs from
EClient.runTimeStats()
method which focuses on eBus
object performance. This monitoring includes:
-
Reporting run queue thread statistics at a configurable
interval. These reports are
RQThreadReport
notifications on theRQMonitor.REPORT_SUBJECT
. -
Reporting eBus object statistics at the same configurable
interval as the run queue thread report. These reports
are
EBusObjectReport
notifications on theRQMonitor.REPORT_SUBJECT
. Note that this report contains the same information asEClient.runTimeStats()
method. -
Reporting when an eBus object is overrunning the
dispatcher maximum quantum (plus a configurable limit).
An alarm is raised via the
ThreadOverrunUpdate
notification on theRQMonitor.RQALARM_SUBJECT
. -
Reporting when a run-ready eBus object is denied access
to a run queue thread for too long (again how long is
configurable). An alarm is raised via the
ThreadDenialUpdate
notification on theRQMonitor.RQALARM_SUBJECT
.
Alarms are raised when a condition transitions from cleared to alarmed or from alarmed to cleared. The run queue report contains the current thread alarm state.
RQMonitor
also implements
StatusReporter
interface
and will add run queue thread and eBus object stats to the
StatusReport
if
the RQMonitor
is started and
registered
with StatusReport
.
For more information on how to use the run queue thread
monitor, see RQMonitor
.
Connecting eBus Applications
eBus applications may be connected by having one
application
open a service
and the other applications
connect
to that service. See EServer
and
ERemoteApp
for complete
explanation of eBus service and connection configuration.
When two eBus applications connect, each eBus sends its local advertisements with remote scope to the other. This allows local subscriptions and requests with remote scope to be satisfied by remote advertisements. eBus does not forward an advertisement from one remote eBus to another remote eBus. Nor does eBus forward notifications and requests from one remote eBus to another remote eBus. eBus only routes messages from:
- a local client to a local client,
- from a local client to a remote client or
- from a remote client to a local client.
eBus allows only one connection to exist between two running applications. Once one eBus application connects to another, any further attempts to connect by either application will fail. eBus can also be configured to accept connections only from a predefined set of Internet addresses or even particular Internet address, TCP port pairs. This filtering is positive only. Negative filtering (accept all connections except those specified) is not supported.
eBus applications can programmatically monitor eBus
connections by subscribing to the message key
net.sf.eBus.client.ConnectionMessage:"/eBus"
.
Connection messages are published when a remote eBus either
logs on or logs off. Subscribe to the message key
net.sf.eBus.client.ServerMessage:"/eBus"
to learn when new connections are accepted by the
server
. Note: both
ConnectionMessage
and
ServerMessage
are published locally only.
Remote eBus applications cannot subscribe to these
notifications.
eBus allows each remote connection to be configured
independently. The first and most important remote
connection property/preference key is
eBus.connections
. This property is a
comma-separated list of remote connection names. Each name
is used in eBus.connection.name
property/preference key. If a
eBus.connection
key contains a
name
which does not
appear in eBus.connections
, then that
property/preference is ignored and will result in the
remote eBus connection not being established.
eBus Connection Types
eBus supports three connection type: TCP, SECURE_TCP, UDP,
and secure UDP. TCP is the default connection type. UDP was
introduced in eBus v. 5.3.0 providing unreliable messaging
between eBus applications. Secure TCP was introduced in
eBus v. 4.7.0 and provides SSL/TLS-based
secure socket
.
This secure socket requires that the user provides an
SSLContext
which is used to
create the SSLEngine
used to
establish the secure SSL/TLS connection over a TCP
connection. The good news is that
AsyncSecureSocket
has the same interface as
AsyncSocket
with both classes now extending
AbstractAsyncSocket
. Because eBus
remote connections reference the underlying async socket as
AbstractAsyncSocket
, users can easily switch
between plain-text, open socket connection and a secure
connection.
Secure UDP introduced in eBus v. 6.0.0 provides DTLS-based
secure datagram socket
.
Like secure TCP user is required to provide a
SSLContext
used to create the
SSLEngine
. The major difference between a
plain datagram socket and secure datagram socket is that
the user must connect the secure datagram socket to a peer
in order to create a secure "connection" between the two
sockets.
The bad news is that secure connection cannot be configured using eBus properties. The reason is that such configuration would require storing sensitive information in clear text, making the secure connection unsecure. This means secure connections can only be created using the connection builder API.
UDP also functions like a connection-based protocol in eBus rather than its natural connection-less manner. That is because eBus itself is connection-based and so requires the protocols it uses to behave in the same way. This means it is necessary for UDP "services" to be opened. Similar
eBus allows for multiple services to be opened. Each service is uniquely defined by connection type and port. The service configuration is used to define accepted connection configuration. This includes the same properties such as buffer sizes, heartbeat, and connection pause. Reconnection is not supported since accepted connections are not initiated by the local eBus application.
eBus now supports multicast "connections" for transmitting notification messages between eBus applications. Unlike the connection types mentioned above multicasting is not used to form a hard connection between applications. Instead notification messages are posted to the joined multicast group without knowing whether anyone is there to receive the packet. While this is a departure from how eBus works it is exactly how multicast works. See Multicast Connections for a more detailed explanation.
Sample eBus Connection Configuration
eBus can be configured on start-up by setting the eBus
properties in a file and references that file in the
java command line: -Dnet.sf.eBus.config.jsonFile=<conf file path>
eBus release 5.1.0 supports using
typesafe
to configure eBus.
services : [
{
name : tcp-remote
connectionType : TCP
port : 6789
inputBufferSize : 8192
outputBufferSize : 8192
messageQueueSize : 100
canPause : true
pause : {
pauseTime : 10m
maxBacklogSize : 50
}
},
{
name : udp-remote
connectionType : UDP
port : 6789
inputBufferSize : 1024
outputBufferSize : 1024
messageQueueSize : 10
canPause : true
}
]
connections : [
{
name : local
host : localhost
port : 12345
heartbeatDelay : 0
heartbeatReplyDelay : 0
reconnect : true
reconnectTime : 500ms
messageQueueSize : 0
inputBufferSize : 4096
outputBufferSize : 8192
canPause : true
pause : {
pauseTime : 5m
maxBacklogSize : 100
discardPolicy : YOUNGEST_FIRST
idleTime : 1m
maxConnectTime : 2m
resumeOnBacklogSize : 10
}
}
]
Building eBus Connections
eBus service and connection configurations can be
programmatically created using the
service builder
and
connection builder
,
respectively. These builders allow the above parameters to
be set as shown in the above properties file example. This
includes input and output buffer sizes, byte order, maximum
message queue size, selector thread, heartbeating rates,
and other parameters. Once the service or connection
configuration is set, the EConfigure.Service
or
EConfigure.RemoteConnection
configuration is
built. For more information, see the service and remote
connection documentation.
Example service and remote connection builders:
final AddressFilter filter = ...; final InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 12345);
final SSLContext secureContext = ...; final SSLContext secureContext = ...;
final EConfigure.ServerBuilder builder = EConfigure.serverBuilder(); final EConfigure.ConnectionBuilder builder = EConfigure.connectionBuilder();
EServer.openServer(builder.name("AppService") ERemoteApp.openConnection(builder.name("Conn0")
.port(6789) .address(address)
.connectionType(EConfigure.ConnectionType.SECURE_TCP) .bindPort(0)
.sslContext(secureContext) .connectionType(EConfigure.ConnectionType.SECURE_TCP)
.addressFilter(filter) .sslContext(secureContext)
.inputBufferSize(1_024) .inputBufferSize(4_096)
.outputBufferSize(1_024) .outputBufferSize(8_192)
.byteOrder(ByteOrder.BIG_ENDIAN) .byteOrder(ByteOrder.BIG_ENDIAN)
.messageQueueSize(10_000) .messageQueueSize(0)
.serviceSelector("svcSelector") .selector("connSelector")
.connectionSelector("connSelector") .reconnect(true)
.heartbeatDelayDuration.ofSeconds(60L)) .reconnectTime(Duration.ofMillis(500L))
.heartbeatReplyDelay(Duration.ofSeconds(30L)) .heartbeatDelay(Duration.ZERO)
.build()); .heartbeatReplyDelay(Duration.ZERO)
.build())
Both the server and remote connection are configured to use the
SSL/TLS-based secure TCP connection. This connection type
requires that the user provides the necessary
SSLContext
needed to establish the secure
connection. How that SSLContext
is securely created
is a user responsibility.
Pausing eBus Connections
eBus release 5.1.0 introduces connection pause. A client eBus may request that a connection be paused for a given duration and that the far-end queue up undelivered messages to a maximum backlog size. The far-end server eBus may accept or reject the pause request. If accepted, the far-end response contains the pause duration and backlog size allowed. These values will be ≤ the requested values. When the client receives this acceptance pause response, the connection is closed. The client then reconnects after the agreed upon delay and receives the undelivered messages (if any).
The client may resume the connection earlier than the agreed upon pause delay if it's local message queue reaches a configured limit. For example, both ends agree to pause for 5 minutes but the client eBus is configured to reconnect when the backlog contains 10 application messages (system messages are not used to calculate queue size). When the backlog has 10 application messages after 3 minutes, the client eBus resumes the paused connection.
Client connections are automatically paused after a specified idle time during which no messages are sent or received or after a maximum connect time. This is to prevent a busy connection from keeping the connection up permanently.
The pause feature is targeted for mobile devices which cannot maintain a network connection for any length of time without excessive battery drain.
eBus Protocol Stack
The following demonstrates how the eBus protocol stack works and how the configuration impacts the protocol stack.
Level | Description | Input | Output |
---|---|---|---|
ERemoteApp |
Responsible for maintaining a connection to a remote eBus application.
|
Forwards messages to the target
ESubject or
ERequest
instances except the logon, logon reply, and
logoff messages.
|
Passes outbound system and user messages to the
associated
|
ETCPConnection | Responsible for serializing outbound messages and deserializing inbound messages. Also responsible for queuing up outbound messages when the socket output buffer is full. Sends the enqueued messages when the buffer overflow condition clears. |
De-serializes eBus messages directly from the
|
Serializes outbound message directly to the socket
output buffer using the
|
AsyncSocket |
Interface between EConnection and the Selector
Thread. Encapsulates the
SelectableChannel .
|
Inbound bytes are retrieved directly to a socket
input buffer. When a read occurs, the newly
copy input buffer is directly passed to
|
Outbound messages are serialized to a socket output
buffer. If the output buffer size is exceeded, then
throws a
The message is transmitted immediately if the socket output buffer has no other pending messages.
|
Selector Thread |
Watches
Note: there is only one
|
Reads bytes from a
|
The Selector Thread passes the
|
The one weakness to this design is if
SelectorThread
loses its CPU, then all
remote eBus messaging stops until the Selector
thread re-gains a CPU. For this reason, the Selector thread
prior is set to Thread.MAX_PRIORITY
in an
attempt to keep the thread on a CPU.
Multicast Connections
eBus multicast connections are built around one or more
notification
multi-feeds
. A
multicast connection either acts as a publisher or a
subscriber. If a publisher then
subscription multi-feeds
are used to receive notifications with remote
scope
and the
send those messages to the joined multicast group. If a
subscriber then
publish multi-feeds
are used to post received multicast notifications to the
local eBus. Again these multi-feeds have a remote feed
scope.
Like TCP and UDP, multicast connections may be created
using either typesafe configuration or the eBus API.
Java Properties
configuration is not supported.
Also (while this may be obvious) multicast connections
cannot be paused.
Sample eBus Multicast Configuration
These two configurations are for a publisher and subscriber multicast connection designed to communicate across the same group. Required properties are in bold.
multicastGroups : [ multicastGroups : [
{ {
name : MCAST-PUB-0 name : MCAST-SUB-0
multicastRole : PUBLISHER multicastRole : SUBSCRIBER
multicastGroup : "230.0.0.0" multicastGroup : "230.0.0.0"
targetPort : 5000 targetPort : 5001
networkIF : en8 networkIF : en8
protocolFamily : INET protocolFamily : INET
bindPort : 5001 bindPort : 5000
order : LITTLE_ENDIAN order : LITTLE_ENDIAN
selector : "mcastSelector" selector : "mcastSelector"
inputSize : 512 inputSize : 512
outputSize : 512 outputSize : 512
# EMultiPublishFeeds defining notification messages # EMultiSubscribeFeeds defining notification messages
# posted to the multicast group. # received from multicast group and published to eBus.
multicastKeys : [ multicastKeys : [
{ {
multifeedType : LIST multifeedType : QUERY
messageClass : com.acme.personal.TextMessage messageClass : com.acme.personal.TextMessage
subjectList : [ subjectQuery : "[A-M][A-Z]*"
"Tom", isDynamic : false
"Dick", },
"Harry" {
] multifeedType : LIST
}, messageClass : com.acme.MarketData.Trade
{ subjectList : [
multifeedType : QUERY "ABCD",
messageClass : com.acme.marketData.Trade "EFG",
subjectQuery : "[A-M][A-Z]*" "STU",
isDynamic : true ]
} }
] ]
} }
]
Building eBus Multicast Connections
Multicast connections are created in two steps: first the
multicast configuration is created and then that
configuration is used to join the multicast group. This is
demonstrated below creating MCAST-PUB-0
shown above.
final EConfigure.MulticastBuilder mcastBuilder = EConfigure.multicastBuilder();
final InetAddress group = InetAddress.getByName("230.0.0.0");
final NetworkInterface netIF = NetworkInterface.getByName("en8");
final List<String> textSubjects = com.google.common.collect.ImmutableList<>.of("Tom", "Dick", "Harry");
final EConfigure.McastNotifyConfig listFeed =
(EConfigure.notificationBuilder()).feedType(EConfigure.MultifeedType.LIST)
.messageClass(com.acme.personal.TextMessage.getCanonicalName())
.subjectList(textSubjects)
.build();
final EConfigure.McastNotifyConfig queryFeed =
(EConfigure.notificationBuilder()).feedType(EConfigure.MultifeedType.QUERY)
.messageClass(com.acme.marketData.Trade.getCanonicalName())
.subjectQuery("[A-M][A-Z]*")
.build();
final List<EConfigure.McastNotifyConfig> mcastKeys = com.google.common.collect.ImmutableList<>.of(listFeed, queryFeed);
final EConfigure.MulticastConnection mcastConfig =
mcastBuilder.name("MCAST-PUB-0")
.role(EConfigure.MulticastRole.PUBLISHER)
.group(group)
.targetPort(5001)
.networkInterface(netIF)
.protocolFamily(StandardProtocolFamily.INET)
.bindPort(5000)
.byteOrder(ByteOrder.LITTLE_ENDIAN)
.inputBufferSize(512)
.outputBufferSize(512)
.notifications(mcastKeys)
.build();
EMulticastConnection.openConnection(mcastConfig);
Monitoring eBus Connections
If there is a need to dynamically monitor eBus connection status (client, server, or multicast) then subscribe to the following message keys, one for each eBus connection type:
-
client:
ConnectionMessage
.MESSAGE_KEY
-
server:
ServerMessage
.MESSAGE_KEY
-
multicast:
MulticastMessage
.MESSAGE_KEY
Subscribe to the messages as follows (note - these messages are published to the local eBus JVM only):
ESubscribeFeed.open(subscriber,
ConnectionMessage.MESSAGE_KEY,
FeedScope.LOCAL_ONLY,
condition);
eBus sends updates as a connection goes through the process of connecting, disconnecting, accepting connections, or joining a multicast group. Since there is no way to retrieve the current status (at this time) it is best to put the subscription(s) in place before opening connections or services.
eBus Extensions
eBus currently has three extension packages:
net.sf.eBus.IMessageExhaust
,
net.sf.eBusx.io
,
net.sf.eBusx.monitor
, and
net.sf.eBusx.util
.
Message Exhuast
eBus release 6.2.0 adds the interface
IMessageExhaust
. An application
implements IMessageExhaust
and passes an instance
of that implementation to
EFeed.setExhaust(net.sf.eBus.client.IMessageExhaust)
.
The local eBus then passes all notification,
request, and reply messages flowing through eBus to the
registered message exhaust.
Note: only one exhaust instance may be may be
registered at a time. If the application does not want to
exhaust all messages, then the IMessageExhaust
implementation is responsible for filtering out unwanted
messages. If the application needs to exhaust messages to
different persistent stores, again the registered exhaust
is responsible for interfacing with those multiple stores.
The application is responsible for opening and closing
exhaust persistent stores when appropriate. Message
exhaust is "turned off" by passing a null
IMessageExhaust
value to EFeed.setExhaust
.
resulting in the default message exhaust which does nothing
with the given eBus message.
IMessageExhaust.exhaust(EMessage)
method is called
from an eBus dispatcher thread. This means that message
exhaust does not interfere with eBus message forwarding -
meaning that the exhaust process is not in-line with the
message forwarding process.
net.sf.eBusx.io
EFileWatcher
is an eBus publisher
notifying subscribers when the subscribed file or directory
is created, modified, or deleted via the
EFileNotification
message.
Subscribers use the target file or directory name as the
subscription subject. Note: the file watcher service
supports only subscribers within the same JVM. The file
watcher is not advertised to other JVMs, whether on the
same host or on a remote host.
net.sf.eBusx.monitor
This package provides eBus applications the ability to
monitor other eBus application and to be monitored.
Monitoring is on the
eBus object
level.
See net.sf.eBusx.monitor
for a detailed
explanation on using the eBus monitor API.
net.sf.eBusx.util
This package provides the eBus timer service. This service
provides an eBus request/reply interface to the
Timer
service.
1
An application object can control whether it communicates
with another object that is either local to its JVM and/or
in another JVM using
EFeed.FeedScope
.
2
As homogenous as Java allows. For example, it is possible
to declare a Number[]
array and place
Integer
and Double
instances into the
array.
3 A reply message already posted to the requestor's message queue will be delivered. Reply messages not yet posted to that queue will not be delivered.
4
java.util.Map
is not a supported eBus
field type. eBus is able to use an unsupported type here
is because MatchEvent
is never transported to
a remote application and this unsupported type is not
detected.
Package | Description |
---|---|
net.sf.eBus.client |
This package contains the top-level eBus API which
applications use to interact with eBus.
|
net.sf.eBus.client.monitor |
This package is centered on
RQMonitor class. |
net.sf.eBus.client.sysmessages |
This package defines the eBus system messages.
|
net.sf.eBus.config |
Provides classes needed to configure eBus network and eBus
core modules.
|
net.sf.eBus.feed.historic |
This package contains a specialized feed providing
notification feeds which support both historic and live
messages.
|
net.sf.eBus.feed.historic.store |
This package contains two implementations of
IEMessageStore interface:
InMemoryMessageStore
and
SqlMessageStore . |
net.sf.eBus.feed.pattern |
This package contains specialized feeds extending the basic
feeds in
net.sf.eBus.client . |
net.sf.eBus.logging |
This package extends slf4j package with an asynchronous
logger.
|
net.sf.eBus.messages |
Messages are the heart of eBus.
|
net.sf.eBus.messages.type |
Provides classes for translating Java built-in types to and
from binary encoding.
|
net.sf.eBus.net |
Provides asynchronous API for Java NIO package.
|
net.sf.eBus.test |
Provides classes supporting JUnit testing.
|
net.sf.eBus.text |
TokenLexer takes a given input and
returns a series of analyzed tokens. |
net.sf.eBus.timer |
eBus release 7.3.0 introduces
EScheduledExecutor which provides a
similar functionality as
ScheduledExecutorService with the
difference that expired timer tasks are dispatched to an eBus
object's task queue. |
net.sf.eBus.util |
Contains supporting utility Java classes for
net.sf.eBus . |
net.sf.eBus.util.logging |
Supplements
java.util.logging package with a
rolling log file handler, a pattern formatter and a logged
status report. |
net.sf.eBus.util.regex |
This regular expression package is designed to perform
efficient pattern matching over a
TernarySearchTree . |
net.sf.eBusx.geo |
This package defines
eBus message fields
implementing GeoJSON data types as defined in
GeoJSON specification.. |
net.sf.eBusx.io |
This package provides the ability to monitor file/directory
creation, modification, and deletion via the
file notification . |
net.sf.eBusx.monitor |
This package provides the ability to instrument
eBus objects and to monitor
those objects' on-going state and transient events. |
net.sf.eBusx.time |
This package provides single Java class:
EInterval . |
net.sf.eBusx.util |
This package provides an eBus interface for accessing
Timer API. |