Search Results for

    Show / Hide Table of Contents

    Class MessageGate<TStart, TEnd>

    This is a helper class which allows to collect a message pair consisting of a start and an end message.

    Inheritance
    Object
    MessageGate<TStart, TEnd>
    Namespace: Agents.Net
    Assembly: Agents.Net.dll
    Syntax
    public class MessageGate<TStart, TEnd> : object where TStart : Message where TEnd : Message
    Type Parameters
    Name Description
    TStart

    Type of the start message.

    TEnd

    Type of the end message.

    Remarks

    This helper should be used with caution as it breaks the basic concept of the agent framework, that each agent does exactly one thing. There are only four use cases were this helper should be used.

    1. An agent serving as a legacy bridge:
      When working in a legacy system it is sometimes necessary to have an agent serving a service interface. In this case it is often so that the service call provides some parameters and expects a specific result.
      To achieve that the service agent needs to wait inside the service call for a specific end message. In addition to that, the service call need to terminate once an exception message was send. It would wait forever otherwise. Use SendAndAwait(TStart, Action<Message>, Int32, CancellationToken) for that use case.
    2. When a single agent's task is to handle message pairs:
      This is necessary for example when handling transactions. In this case the gate can be used to determine whether to rollback the transaction or not. Another example would be when an InterceptorAgent that delays the execution of a message so that an injected message chain can be executed. Use for that use case.
    3. When an agent sends multiple messages of the same type and aggregates them:
      This is explicit parallelization. The work can be split in batches to execute them parallel. The batches can be as small as a single unit of work. This is almost as effective as creating batches based on the amount of available CPU Cores. Use SendAndAggregate(IReadOnlyCollection<TStart>, Action<Message>) for that use case.
    4. Calling a simple basic operation inside the agent framework:
      Let's assume the following use case: The application has a file system abstraction which can execute CRUD operations on files and directories. Now I want to create an agent, that creates a new configuration file.
      Without this class the solution would look like this:
       ---------------         -------------          --------------------------
      | ConfigCreator | ----> | FileCreator | -----> | ConfigFileCreatedWatcher |
       ---------------         -------------          --------------------------
      FileCreating             FileCreated            ConfigFileCreated
      The ConfigCreator would only create the FileCreating message and mark it somehow for the ConfigFileCreatedWatcher which only marks the FileCreated message. This would unnecessarily increase the amount of agents in the system.
      With this class the solution would look like this:
       ---------------          -------------
      | ConfigCreator | <----> | FileCreator |
       ---------------          -------------
      FileCreating             FileCreated
      ConfigFileCreated
      Similar are operations such as executing an external process or database operations are more examples were the second use case would helpful. Use for that use case.

    Internally this helper uses MessageCollector<T1, T2>s to execute the boilerplate code. Therefore all information regarding the MessageCollector<T1, T2> applies here to, such as considering message domains.

    Speaking of domains. The class will internally use a message domain to mark the start message. It is not necessary to do this by the calling code.

    Examples

    This example shows the first use case of a legacy service call.

    [Consumes(typeof(TEnd))]
    [Consumes(typeof(ExceptionMessage))]
    [Produces(typeof(TStart))]
    public class ServiceAgentImplementation : Agent, IService
    {
        private readonly MessageGate<TStart,TEnd> gate = new MessageGate<TStart,TEnd>();
    
        public ServiceAgentImplementation(IMessageBoard messageBoard) : base(messageBoard)
        {
        }
    
        protected override void ExecuteCore(Message messageData)
        {
            gate.Check(messageData);
        }
    
        public TResult ServiceCall(TParam parameters)
        {
            MessageGateResult<TEnd> result = gate.SendAndAwait(parameters, OnMessage);
            //evaluate result and return TResult
        }
    }

    Fields

    | Improve this Doc View Source

    NoTimout

    A constant value to tell the MessageGate<TStart, TEnd> that it has to wait forever.

    Declaration
    public const int NoTimout = null
    Field Value
    Type Description
    Int32

    Methods

    | Improve this Doc View Source

    Check(Message)

    Checks whether the provided exception message is the end message or an exception message for the awaited SendAndAwait(TStart, Action<Message>, Int32, CancellationToken) operation.

    Declaration
    public bool Check(Message message)
    Parameters
    Type Name Description
    Message message

    The message to check.

    Returns
    Type Description
    Boolean

    true, if the message was the end message or an exception message for the SendAndAwait(TStart, Action<Message>, Int32, CancellationToken) operation.

    Remarks

    For an example how to use this class see the type documentation.

    | Improve this Doc View Source

    SendAndAggregate(IReadOnlyCollection<TStart>, Action<Message>)

    The method to send the start messages and aggregate all end messages in a MessagesAggregated<T> message.

    Declaration
    public void SendAndAggregate(IReadOnlyCollection<TStart> startMessages, Action<Message> onMessage)
    Parameters
    Type Name Description
    IReadOnlyCollection<TStart> startMessages

    The start messages to send.

    Action<Message> onMessage

    The action to send the message.

    Remarks

    When a batch of messages is published using the this class they can be united again when all last messages of the execution chain are of the same type or an exception message.

     --------------         ---------------------          -----------------
    | SplitMessage | ----> | IntermediateMessage | -----> | FinishedMessage |
     --------------         ---------------------  |       -----------------
                                                   |
                                                   |       ------------------
                                                   *----> | ExceptionMessage |
                                                   |       ------------------
                                                   |
                                                   |       ------------------
                                                    ----> | OtherEndMessage  |
                                                           ------------------

    Looking at the example above it would not be possible to unite the SplitMessages again using this class as at least one IntermediateMessage let to an OtherEndMessage.

    Here a typical example how to setup and use this method:
    [Consumes(typeof(FinishedMessage))]
    [Consumes(typeof(ExceptionMessage))]
    [Produces(typeof(StartMessage))]
    public class MessageAggregatorAgent : Agent
    {
        private readonly MessageGate<FinishedMessage> gate = new MessageGate<FinishedMessage>();
    
    public MessageAggregatorAgent(IMessageBoard messageBoard) : base(messageBoard)
    {
    }
    
    protected override void ExecuteCore(Message messageData)
    {
        if(gate.Check(messageData))
        {
            return;
        }
        //create startMessages
        gate.SendAndAggregate(startMessages, OnMessage);
    }
    

    }

    | Improve this Doc View Source

    SendAndAwait(TStart, Action<Message>, Int32, CancellationToken)

    The method to send a start message and wait for the end message.

    Declaration
    public MessageGateResult<TEnd> SendAndAwait(TStart startMessage, Action<Message> onMessage, int timeout = null, CancellationToken cancellationToken = null)
    Parameters
    Type Name Description
    TStart startMessage

    The start message.

    Action<Message> onMessage

    The action to send the message.

    Int32 timeout

    Optionally a timeout after which the method will return, without sending the result. By default the timeout is NoTimout

    CancellationToken cancellationToken

    Optionally a cancellation token to cancel the wait operation. This is helpful, when for example the imitated service call is an async method. By default no CancellationToken will be used.

    Returns
    Type Description
    MessageGateResult<TEnd>

    The MessageGateResult<TEnd> of the operation.

    Remarks

    For an example how to use this class see the type documentation.

    WARNING: Extensive use of this method will lead to time gaps in the execution. See .net issue on github: https://github.com/dotnet/runtime/issues/55562 Use this method only for the legacy service call. Of all other scenarios use .

    | Improve this Doc View Source

    SendAndContinue(TStart, Action<Message>, Action<MessageGateResult<TEnd>>, Int32, CancellationToken)

    The method to send a start message and wait for the end message.

    Declaration
    public void SendAndContinue(TStart startMessage, Action<Message> onMessage, Action<MessageGateResult<TEnd>> continueAction, int timeout = null, CancellationToken cancellationToken = null)
    Parameters
    Type Name Description
    TStart startMessage

    The start message.

    Action<Message> onMessage

    The action to send the message.

    Action<MessageGateResult<TEnd>> continueAction

    The action to execute once a MessageGateResult<TEnd> was created.

    Int32 timeout

    Optionally a timeout after which the method will return, without sending the result. By default the timeout is NoTimout

    CancellationToken cancellationToken

    Optionally a cancellation token to cancel the continue operation. By default no CancellationToken will be used.

    Remarks

    For an example how to use this class see the type documentation.

    | Improve this Doc View Source

    SendAndContinue(IReadOnlyCollection<TStart>, Action<Message>, Action<MessageAggregationResult<TEnd>>, Int32, CancellationToken)

    The method to send the start messages and wait for all end messages.

    Declaration
    public void SendAndContinue(IReadOnlyCollection<TStart> startMessages, Action<Message> onMessage, Action<MessageAggregationResult<TEnd>> onAggregated, int timeout = null, CancellationToken cancellationToken = null)
    Parameters
    Type Name Description
    IReadOnlyCollection<TStart> startMessages

    The start messages to send.

    Action<Message> onMessage

    The action to send the message.

    Action<MessageAggregationResult<TEnd>> onAggregated

    The action to execute once a MessageAggregationResult<TEnd> was created.

    Int32 timeout

    Optionally a timeout after which the method will return, without sending the result. By default the timeout is NoTimout

    CancellationToken cancellationToken

    Optionally a cancellation token to cancel the continue operation. By default no CancellationToken will be used.

    Remarks

    When a batch of messages is published using the this class they can be united again when all last messages of the execution chain are of the same type or an exception message.

     --------------         ---------------------          -----------------
    | SplitMessage | ----> | IntermediateMessage | -----> | FinishedMessage |
     --------------         ---------------------  |       -----------------
                                                   |
                                                   |       ------------------
                                                   *----> | ExceptionMessage |
                                                   |       ------------------
                                                   |
                                                   |       ------------------
                                                    ----> | OtherEndMessage  |
                                                           ------------------

    Looking at the example above it would not be possible to unite the SplitMessages again using this class as at least one IntermediateMessage let to an OtherEndMessage.

    This function is useful when the aggregated end messages need to be modified - for example filtered - before aggregating them. In all other cases it is better to use SendAndAggregate(IReadOnlyCollection<TStart>, Action<Message>) to automatically create and send an aggregated message.

    This is an example, how to use this method correctly:
    [Consumes(typeof(FinishedMessage))]
    [Consumes(typeof(ExceptionMessage))]
    [Produces(typeof(StartMessage))]
    [Produces(typeof(AggregatedMessage))]
    public class MessageAggregatorAgent : Agent
    {
        private readonly MessageGate<FinishedMessage> gate = new MessageGate<FinishedMessage>();
    
    public MessageAggregatorAgent(IMessageBoard messageBoard) : base(messageBoard)
    {
    }
    
    protected override void ExecuteCore(Message messageData)
    {
        if(gate.Check(messageData))
        {
            return;
        }
        //create startMessages
        gate.SendAndContinue(startMessages, OnMessage, result =>
        {
            //manipulate the results and produce aggregated message
            OnMessage(aggregatedMessage);
        });
    }
    

    }

    • Improve this Doc
    • View Source
    In This Article
    Back to top © Copyright Tobias Wilker and contributors