Search Results for

    Show / Hide Table of Contents

    Class MessageCollector<T1, T2>

    This is a helper class that collects thread safe all messages until all message types are collected.

    Inheritance
    Object
    MessageCollector<T1, T2>
    MessageCollector<T1, T2, T3>
    Namespace: Agents.Net
    Assembly: Agents.Net.dll
    Syntax
    public class MessageCollector<T1, T2> : object where T1 : Message where T2 : Message
    Type Parameters
    Name Description
    T1

    First message type.

    T2

    Second message type.

    Remarks

    The collector collects messages until all message types are collected. Afterwards it executes an action with the collected MessageCollection. In the following different scenarios are described, which explain the different aspects of this class.

    Overriding messages

    Assuming the message of type T1 was collected and T2 was not. When another message of type T1 is collected it replaces the old T1 message with the new message. No action is executed.

    Considering message domains

    Assuming the following MessageDomains and the collected messages:

     ---------------         ------------
    | DefaultDomain | ----> | SubDomain1 |
     ---------------  |      ------------
    T1 Message1       |      T1 Message2
                      |      T2 Message4
                      |
                      |      ------------
                      ----> | SubDomain2 |
                             ------------
                             T2 Message3
                             T2 Message5
    Here is what happens in this scenario. Message1 is collected. Message2 is collected. This does not override Message1 because it is from a different MessageDomain. It is stored parallel. Message3 is collected. Now SubDomain2 has a complete set of Message1 + Message3. Message 4 is collected. Now there is another completed set in SubDomain1 of Message2 + Message4. Message5 is collected. This overrides Message3 as it is in the same MessageDomain. A new set is executed with Message1 + Message5. In the end the following sets were executed inorder:
    1. Message1 + Message3
    2. Message2 + Message4
    3. Message1 + Message5

    Consumed vs not consumed messages

    Messages can be consumed during execution with the method MarkAsConsumed(Message). The consumed message is removed from the message collector immediately. Looking at the example above the Message1 is used twice. Assuming during the first execution of Message1 + Message3 the agent executed collection.MarkAsConsumed(Message1). In this case the third execution Message1 + Message5 would not have happened, as Message1 would have been cleared from the collector instance.

    Examples

    Here a typical example how to setup and use the collector in a class:

    [Consumes(typeof(Message1))]
    [Consumes(typeof(Message2))]
    public class MessageCollectorAgent : Agent
    {
        private readonly MessageCollector<Message1, Message2> collector;
    
        public MessageCollectorAgent(IMessageBoard messageBoard) : base(messageBoard)
        {
            collector = new MessageCollector<Message1, Message2>(OnMessagesCollected);
        }
    
        private void OnMessagesCollected(MessageCollection<Message1, Message2> set)
        {
            //execute set
        }
    
        protected override void ExecuteCore(Message messageData)
        {
            collector.Push(messageData);
        }
    }

    Constructors

    | Improve this Doc View Source

    MessageCollector(Action<MessageCollection<T1, T2>>)

    Initialized a new instance of the class MessageCollector<T1, T2>.

    Declaration
    public MessageCollector(Action<MessageCollection<T1, T2>> onMessagesCollected = null)
    Parameters
    Type Name Description
    Action<MessageCollection<T1, T2>> onMessagesCollected

    The action which is executed when all messages were collected.

    Properties

    | Improve this Doc View Source

    Messages1

    Store for messages of type T1.

    Declaration
    protected ConcurrentDictionary<MessageDomain, MessageStore<T1>> Messages1 { get; }
    Property Value
    Type Description
    ConcurrentDictionary<MessageDomain, MessageStore<T1>>
    | Improve this Doc View Source

    Messages2

    Store for messages of type T2.

    Declaration
    protected ConcurrentDictionary<MessageDomain, MessageStore<T2>> Messages2 { get; }
    Property Value
    Type Description
    ConcurrentDictionary<MessageDomain, MessageStore<T2>>

    Methods

    | Improve this Doc View Source

    Aggregate(Message, Boolean)

    Overridden by inheriting classes to try to add the message to the collector dictionaries.

    Declaration
    protected virtual bool Aggregate(Message message, bool throwError)
    Parameters
    Type Name Description
    Message message

    The message to add

    Boolean throwError

    If set to true, throws an error if the message could not be added.

    Returns
    Type Description
    Boolean

    true if the message was added to any dictionary; otherwise false.

    | Improve this Doc View Source

    Execute(MessageCollection)

    Overridden by inheriting classes to execute the typeless message collection cast to the specific collector type..

    Declaration
    protected virtual void Execute(MessageCollection messageCollection)
    Parameters
    Type Name Description
    MessageCollection messageCollection

    The typeless collection.

    | Improve this Doc View Source

    ExecutePushAndContinue(Message, Action<MessageCollection>, CancellationToken)

    Execute the routine for PushAndExecute(Message, Action<MessageCollection<T1, T2>>, CancellationToken).

    Declaration
    protected void ExecutePushAndContinue(Message message, Action<MessageCollection> executeAction, CancellationToken cancellationToken)
    Parameters
    Type Name Description
    Message message

    The message which is pushed.

    Action<MessageCollection> executeAction

    The action which should execute the action which was passed to the PushAndExecute(Message, Action<MessageCollection<T1, T2>>, CancellationToken) method.

    CancellationToken cancellationToken
    | Improve this Doc View Source

    ExecutePushAndExecute(Message, Action<MessageCollection>, CancellationToken)

    Execute the routine for PushAndExecute(Message, Action<MessageCollection<T1, T2>>, CancellationToken).

    Declaration
    protected bool ExecutePushAndExecute(Message message, Action<MessageCollection> executeAction, CancellationToken cancellationToken)
    Parameters
    Type Name Description
    Message message

    The message which is pushed.

    Action<MessageCollection> executeAction

    The action which should execute the action which was passed to the PushAndExecute(Message, Action<MessageCollection<T1, T2>>, CancellationToken) method.

    CancellationToken cancellationToken

    Cancellation token to stop the wait operation.

    Returns
    Type Description
    Boolean

    true if the action was executed; otherwise false.

    | Improve this Doc View Source

    GetCompleteSets(MessageDomain)

    Overridden by inheriting classes to get all sets of message for a specific domain without specific type.

    Declaration
    protected IEnumerable<MessageCollection> GetCompleteSets(MessageDomain domain)
    Parameters
    Type Name Description
    MessageDomain domain

    The domain for which sets should be found.

    Returns
    Type Description
    IEnumerable<MessageCollection>

    An enumeration of all completed sets for the domain.

    | Improve this Doc View Source

    IsCompleted(MessageDomain, out MessageCollection)

    Overridden by inheriting classes to see if there is a completed set for the specified domain.

    Declaration
    protected virtual bool IsCompleted(MessageDomain domain, out MessageCollection messageCollection)
    Parameters
    Type Name Description
    MessageDomain domain

    The domain which should be completed.

    MessageCollection messageCollection

    The message collection with the complete set.

    Returns
    Type Description
    Boolean

    true if there is a completed set; otherwise false

    | Improve this Doc View Source

    Push(Message)

    Add a message to the collector.

    Declaration
    public void Push(Message message)
    Parameters
    Type Name Description
    Message message

    The message object to add.

    | Improve this Doc View Source

    PushAndContinue(Message, Action<MessageCollection<T1, T2>>, CancellationToken)

    Add a message to the collector and continue with the specified action once when the set is found.

    Declaration
    public void PushAndContinue(Message message, Action<MessageCollection<T1, T2>> onCollected, CancellationToken cancellationToken = null)
    Parameters
    Type Name Description
    Message message

    The message which is added to the collector.

    Action<MessageCollection<T1, T2>> onCollected

    The action which is executed when the complete set is found.

    CancellationToken cancellationToken

    Cancellation token to stop the continue operation.

    | Improve this Doc View Source

    PushAndExecute(Message, Action<MessageCollection<T1, T2>>, CancellationToken)

    Add a message to the collector and wait of the complete set to execute the specified action.

    Declaration
    public bool PushAndExecute(Message message, Action<MessageCollection<T1, T2>> onCollected, CancellationToken cancellationToken = null)
    Parameters
    Type Name Description
    Message message

    The message which is added to the collector.

    Action<MessageCollection<T1, T2>> onCollected

    The action which is executed when the complete set is found.

    CancellationToken cancellationToken

    Cancellation token to stop the wait operation.

    Returns
    Type Description
    Boolean

    true if the action was executed; otherwise false.

    Remarks

    This method is helpful for InterceptorAgents where the agent in the InterceptCore(Message) method must wait for a set of message before returning the InterceptionAction.

    Another example is when the InterceptorAgent wants to wait on a single message. In this case the first message is the message that is intercepted. The second message is the message the agent needs. The advantage is, that the collector respects message domains.

    | Improve this Doc View Source

    RemoveMessage(Message)

    Overriden by inheriting classes to remove the message passed to the method.

    Declaration
    protected virtual void RemoveMessage(Message message)
    Parameters
    Type Name Description
    Message message

    The message to remove from the collector.

    | Improve this Doc View Source

    TryGetMessageFittingDomain<T>(MessageDomain, ConcurrentDictionary<MessageDomain, MessageStore<T>>, out MessageStore<T>)

    Used to get a message from the message dictionaries.

    Declaration
    protected bool TryGetMessageFittingDomain<T>(MessageDomain domain, ConcurrentDictionary<MessageDomain, MessageStore<T>> messagePool, out MessageStore<T> message)
        where T : Message
    Parameters
    Type Name Description
    MessageDomain domain

    The message domain to get the message for.

    ConcurrentDictionary<MessageDomain, MessageStore<T>> messagePool

    The dictionary for the specific message type.

    MessageStore<T> message

    The message for the domain.

    Returns
    Type Description
    Boolean

    true if the dictionary contains a message for the specific domain; otherwise false.

    Type Parameters
    Name Description
    T

    The type of the message.

    | Improve this Doc View Source

    TryPush(Message)

    Tries to add the message to the collector.

    Declaration
    public bool TryPush(Message message)
    Parameters
    Type Name Description
    Message message

    The message object to add.

    Returns
    Type Description
    Boolean

    true if the message was added; otherwise false

    | Improve this Doc View Source

    UpdateMessagePool<T>(T, ConcurrentDictionary<MessageDomain, MessageStore<T>>)

    Add the message to the specified message pool.

    Declaration
    protected void UpdateMessagePool<T>(T message, ConcurrentDictionary<MessageDomain, MessageStore<T>> messagePool)
        where T : Message
    Parameters
    Type Name Description
    T message

    The message to add.

    ConcurrentDictionary<MessageDomain, MessageStore<T>> messagePool

    The message pool to add the message to.

    Type Parameters
    Name Description
    T

    The type of the message.

    • Improve this Doc
    • View Source
    In This Article
    Back to top © Copyright Tobias Wilker and contributors