Class MessageCollector<T1, T2>
This is a helper class that collects thread safe all messages until all message types are collected.
Namespace: Agents.Net
Assembly: Agents.Net.dll
Syntax
public class MessageCollector<T1, T2> : object where T1 : Message where T2 : Message
Type Parameters
Name | Description |
---|---|
T1 | First message type. |
T2 | Second message type. |
Remarks
The collector collects messages until all message types are collected. Afterwards it executes an action with the collected MessageCollection. In the following different scenarios are described, which explain the different aspects of this class.
Overriding messages
Assuming the message of type T1
was collected and T2
was not. When another message of type T1
is collected it replaces the old T1
message with the new message. No action is executed.
Considering message domains
Assuming the following MessageDomains and the collected messages:
--------------- ------------
| DefaultDomain | ----> | SubDomain1 |
--------------- | ------------
T1 Message1 | T1 Message2
| T2 Message4
|
| ------------
----> | SubDomain2 |
------------
T2 Message3
T2 Message5
Here is what happens in this scenario. Message1 is collected. Message2 is collected. This does not override Message1 because it is from a different MessageDomain. It is stored parallel. Message3 is collected. Now SubDomain2 has a complete set of Message1 + Message3. Message 4 is collected. Now there is another completed set in SubDomain1 of Message2 + Message4. Message5 is collected. This overrides Message3 as it is in the same MessageDomain. A new set is executed with Message1 + Message5. In the end the following sets were executed inorder:
- Message1 + Message3
- Message2 + Message4
- Message1 + Message5
Consumed vs not consumed messages
Messages can be consumed during execution with the method MarkAsConsumed(Message). The consumed message is removed from the message collector immediately. Looking at the example above the Message1 is used twice. Assuming during the first execution of Message1 + Message3 the agent executed collection.MarkAsConsumed(Message1)
. In this case the third execution Message1 + Message5 would not have happened, as Message1 would have been cleared from the collector instance.
Examples
Here a typical example how to setup and use the collector in a class:
[Consumes(typeof(Message1))]
[Consumes(typeof(Message2))]
public class MessageCollectorAgent : Agent
{
private readonly MessageCollector<Message1, Message2> collector;
public MessageCollectorAgent(IMessageBoard messageBoard) : base(messageBoard)
{
collector = new MessageCollector<Message1, Message2>(OnMessagesCollected);
}
private void OnMessagesCollected(MessageCollection<Message1, Message2> set)
{
//execute set
}
protected override void ExecuteCore(Message messageData)
{
collector.Push(messageData);
}
}
Constructors
| Improve this Doc View SourceMessageCollector(Action<MessageCollection<T1, T2>>)
Initialized a new instance of the class MessageCollector<T1, T2>.
Declaration
public MessageCollector(Action<MessageCollection<T1, T2>> onMessagesCollected = null)
Parameters
Type | Name | Description |
---|---|---|
Action<MessageCollection<T1, T2>> | onMessagesCollected | The action which is executed when all messages were collected. |
Properties
| Improve this Doc View SourceMessages1
Store for messages of type T1
.
Declaration
protected ConcurrentDictionary<MessageDomain, MessageStore<T1>> Messages1 { get; }
Property Value
Type | Description |
---|---|
ConcurrentDictionary<MessageDomain, MessageStore<T1>> |
Messages2
Store for messages of type T2
.
Declaration
protected ConcurrentDictionary<MessageDomain, MessageStore<T2>> Messages2 { get; }
Property Value
Type | Description |
---|---|
ConcurrentDictionary<MessageDomain, MessageStore<T2>> |
Methods
| Improve this Doc View SourceAggregate(Message, Boolean)
Overridden by inheriting classes to try to add the message to the collector dictionaries.
Declaration
protected virtual bool Aggregate(Message message, bool throwError)
Parameters
Type | Name | Description |
---|---|---|
Message | message | The message to add |
Boolean | throwError | If set to |
Returns
Type | Description |
---|---|
Boolean |
|
Execute(MessageCollection)
Overridden by inheriting classes to execute the typeless message collection cast to the specific collector type..
Declaration
protected virtual void Execute(MessageCollection messageCollection)
Parameters
Type | Name | Description |
---|---|---|
MessageCollection | messageCollection | The typeless collection. |
ExecutePushAndContinue(Message, Action<MessageCollection>, CancellationToken)
Execute the routine for PushAndExecute(Message, Action<MessageCollection<T1, T2>>, CancellationToken).
Declaration
protected void ExecutePushAndContinue(Message message, Action<MessageCollection> executeAction, CancellationToken cancellationToken)
Parameters
Type | Name | Description |
---|---|---|
Message | message | The message which is pushed. |
Action<MessageCollection> | executeAction | The action which should execute the action which was passed to the PushAndExecute(Message, Action<MessageCollection<T1, T2>>, CancellationToken) method. |
CancellationToken | cancellationToken |
ExecutePushAndExecute(Message, Action<MessageCollection>, CancellationToken)
Execute the routine for PushAndExecute(Message, Action<MessageCollection<T1, T2>>, CancellationToken).
Declaration
protected bool ExecutePushAndExecute(Message message, Action<MessageCollection> executeAction, CancellationToken cancellationToken)
Parameters
Type | Name | Description |
---|---|---|
Message | message | The message which is pushed. |
Action<MessageCollection> | executeAction | The action which should execute the action which was passed to the PushAndExecute(Message, Action<MessageCollection<T1, T2>>, CancellationToken) method. |
CancellationToken | cancellationToken | Cancellation token to stop the wait operation. |
Returns
Type | Description |
---|---|
Boolean |
|
GetCompleteSets(MessageDomain)
Overridden by inheriting classes to get all sets of message for a specific domain without specific type.
Declaration
protected IEnumerable<MessageCollection> GetCompleteSets(MessageDomain domain)
Parameters
Type | Name | Description |
---|---|---|
MessageDomain | domain | The domain for which sets should be found. |
Returns
Type | Description |
---|---|
IEnumerable<MessageCollection> | An enumeration of all completed sets for the domain. |
IsCompleted(MessageDomain, out MessageCollection)
Overridden by inheriting classes to see if there is a completed set for the specified domain.
Declaration
protected virtual bool IsCompleted(MessageDomain domain, out MessageCollection messageCollection)
Parameters
Type | Name | Description |
---|---|---|
MessageDomain | domain | The domain which should be completed. |
MessageCollection | messageCollection | The message collection with the complete set. |
Returns
Type | Description |
---|---|
Boolean |
|
Push(Message)
Add a message to the collector.
Declaration
public void Push(Message message)
Parameters
Type | Name | Description |
---|---|---|
Message | message | The message object to add. |
PushAndContinue(Message, Action<MessageCollection<T1, T2>>, CancellationToken)
Add a message to the collector and continue with the specified action once when the set is found.
Declaration
public void PushAndContinue(Message message, Action<MessageCollection<T1, T2>> onCollected, CancellationToken cancellationToken = null)
Parameters
Type | Name | Description |
---|---|---|
Message | message | The message which is added to the collector. |
Action<MessageCollection<T1, T2>> | onCollected | The action which is executed when the complete set is found. |
CancellationToken | cancellationToken | Cancellation token to stop the continue operation. |
PushAndExecute(Message, Action<MessageCollection<T1, T2>>, CancellationToken)
Add a message to the collector and wait of the complete set to execute the specified action.
Declaration
public bool PushAndExecute(Message message, Action<MessageCollection<T1, T2>> onCollected, CancellationToken cancellationToken = null)
Parameters
Type | Name | Description |
---|---|---|
Message | message | The message which is added to the collector. |
Action<MessageCollection<T1, T2>> | onCollected | The action which is executed when the complete set is found. |
CancellationToken | cancellationToken | Cancellation token to stop the wait operation. |
Returns
Type | Description |
---|---|
Boolean |
|
Remarks
This method is helpful for InterceptorAgents where the agent in the InterceptCore(Message) method must wait for a set of message before returning the InterceptionAction.
Another example is when the InterceptorAgent wants to wait on a single message. In this case the first message is the message that is intercepted. The second message is the message the agent needs. The advantage is, that the collector respects message domains.
RemoveMessage(Message)
Overriden by inheriting classes to remove the message passed to the method.
Declaration
protected virtual void RemoveMessage(Message message)
Parameters
Type | Name | Description |
---|---|---|
Message | message | The message to remove from the collector. |
TryGetMessageFittingDomain<T>(MessageDomain, ConcurrentDictionary<MessageDomain, MessageStore<T>>, out MessageStore<T>)
Used to get a message from the message dictionaries.
Declaration
protected bool TryGetMessageFittingDomain<T>(MessageDomain domain, ConcurrentDictionary<MessageDomain, MessageStore<T>> messagePool, out MessageStore<T> message)
where T : Message
Parameters
Type | Name | Description |
---|---|---|
MessageDomain | domain | The message domain to get the message for. |
ConcurrentDictionary<MessageDomain, MessageStore<T>> | messagePool | The dictionary for the specific message type. |
MessageStore<T> | message | The message for the domain. |
Returns
Type | Description |
---|---|
Boolean |
|
Type Parameters
Name | Description |
---|---|
T | The type of the message. |
TryPush(Message)
Tries to add the message to the collector.
Declaration
public bool TryPush(Message message)
Parameters
Type | Name | Description |
---|---|---|
Message | message | The message object to add. |
Returns
Type | Description |
---|---|
Boolean |
|
UpdateMessagePool<T>(T, ConcurrentDictionary<MessageDomain, MessageStore<T>>)
Add the message to the specified message pool.
Declaration
protected void UpdateMessagePool<T>(T message, ConcurrentDictionary<MessageDomain, MessageStore<T>> messagePool)
where T : Message
Parameters
Type | Name | Description |
---|---|---|
T | message | The message to add. |
ConcurrentDictionary<MessageDomain, MessageStore<T>> | messagePool | The message pool to add the message to. |
Type Parameters
Name | Description |
---|---|
T | The type of the message. |