Class MessageGate<TStart, TEnd>
This is a helper class which allows to collect a message pair consisting of a start and an end message.
Namespace: Agents.Net
Assembly: Agents.Net.dll
Syntax
public class MessageGate<TStart, TEnd> : object where TStart : Message where TEnd : Message
Type Parameters
Name | Description |
---|---|
TStart | Type of the start message. |
TEnd | Type of the end message. |
Remarks
This helper should be used with caution as it breaks the basic concept of the agent framework, that each agent does exactly one thing. There are only four use cases were this helper should be used.
- An agent serving as a legacy bridge:
When working in a legacy system it is sometimes necessary to have an agent serving a service interface. In this case it is often so that the service call provides some parameters and expects a specific result.
To achieve that the service agent needs to wait inside the service call for a specific end message. In addition to that, the service call need to terminate once an exception message was send. It would wait forever otherwise. Use SendAndAwait(TStart, Action<Message>, Int32, CancellationToken) for that use case. - When a single agent's task is to handle message pairs:
This is necessary for example when handling transactions. In this case the gate can be used to determine whether to rollback the transaction or not. Another example would be when an InterceptorAgent that delays the execution of a message so that an injected message chain can be executed. Usefor that use case. - When an agent sends multiple messages of the same type and aggregates them:
This is explicit parallelization. The work can be split in batches to execute them parallel. The batches can be as small as a single unit of work. This is almost as effective as creating batches based on the amount of available CPU Cores. Use SendAndAggregate(IReadOnlyCollection<TStart>, Action<Message>) for that use case. - Calling a simple basic operation inside the agent framework:
Let's assume the following use case: The application has a file system abstraction which can execute CRUD operations on files and directories. Now I want to create an agent, that creates a new configuration file.
Without this class the solution would look like this:
The--------------- ------------- -------------------------- | ConfigCreator | ----> | FileCreator | -----> | ConfigFileCreatedWatcher | --------------- ------------- -------------------------- FileCreating FileCreated ConfigFileCreated
ConfigCreator
would only create theFileCreating
message and mark it somehow for theConfigFileCreatedWatcher
which only marks theFileCreated
message. This would unnecessarily increase the amount of agents in the system.
With this class the solution would look like this:
Similar are operations such as executing an external process or database operations are more examples were the second use case would helpful. Use--------------- ------------- | ConfigCreator | <----> | FileCreator | --------------- ------------- FileCreating FileCreated ConfigFileCreated
for that use case.
Internally this helper uses MessageCollector<T1, T2>s to execute the boilerplate code. Therefore all information regarding the MessageCollector<T1, T2> applies here to, such as considering message domains.
Speaking of domains. The class will internally use a message domain to mark the start message. It is not necessary to do this by the calling code.
Examples
This example shows the first use case of a legacy service call.
[Consumes(typeof(TEnd))]
[Consumes(typeof(ExceptionMessage))]
[Produces(typeof(TStart))]
public class ServiceAgentImplementation : Agent, IService
{
private readonly MessageGate<TStart,TEnd> gate = new MessageGate<TStart,TEnd>();
public ServiceAgentImplementation(IMessageBoard messageBoard) : base(messageBoard)
{
}
protected override void ExecuteCore(Message messageData)
{
gate.Check(messageData);
}
public TResult ServiceCall(TParam parameters)
{
MessageGateResult<TEnd> result = gate.SendAndAwait(parameters, OnMessage);
//evaluate result and return TResult
}
}
Fields
| Improve this Doc View SourceNoTimout
A constant value to tell the MessageGate<TStart, TEnd> that it has to wait forever.
Declaration
public const int NoTimout = null
Field Value
Type | Description |
---|---|
Int32 |
Methods
| Improve this Doc View SourceCheck(Message)
Checks whether the provided exception message is the end message or an exception message for the awaited SendAndAwait(TStart, Action<Message>, Int32, CancellationToken) operation.
Declaration
public bool Check(Message message)
Parameters
Type | Name | Description |
---|---|---|
Message | message | The message to check. |
Returns
Type | Description |
---|---|
Boolean |
|
Remarks
For an example how to use this class see the type documentation.
SendAndAggregate(IReadOnlyCollection<TStart>, Action<Message>)
The method to send the start messages and aggregate all end messages in a MessagesAggregated<T> message.
Declaration
public void SendAndAggregate(IReadOnlyCollection<TStart> startMessages, Action<Message> onMessage)
Parameters
Type | Name | Description |
---|---|---|
IReadOnlyCollection<TStart> | startMessages | The start messages to send. |
Action<Message> | onMessage | The action to send the message. |
Remarks
When a batch of messages is published using the this class they can be united again when all last messages of the execution chain are of the same type or an exception message.
-------------- --------------------- -----------------
| SplitMessage | ----> | IntermediateMessage | -----> | FinishedMessage |
-------------- --------------------- | -----------------
|
| ------------------
*----> | ExceptionMessage |
| ------------------
|
| ------------------
----> | OtherEndMessage |
------------------
Looking at the example above it would not be possible to unite the SplitMessages
again using this class as at least one IntermediateMessage
let to an OtherEndMessage
.
[Consumes(typeof(FinishedMessage))]
[Consumes(typeof(ExceptionMessage))]
[Produces(typeof(StartMessage))]
public class MessageAggregatorAgent : Agent
{
private readonly MessageGate<FinishedMessage> gate = new MessageGate<FinishedMessage>();
public MessageAggregatorAgent(IMessageBoard messageBoard) : base(messageBoard)
{
}
protected override void ExecuteCore(Message messageData)
{
if(gate.Check(messageData))
{
return;
}
//create startMessages
gate.SendAndAggregate(startMessages, OnMessage);
}
}
SendAndAwait(TStart, Action<Message>, Int32, CancellationToken)
The method to send a start message and wait for the end message.
Declaration
public MessageGateResult<TEnd> SendAndAwait(TStart startMessage, Action<Message> onMessage, int timeout = null, CancellationToken cancellationToken = null)
Parameters
Type | Name | Description |
---|---|---|
TStart | startMessage | The start message. |
Action<Message> | onMessage | The action to send the message. |
Int32 | timeout | Optionally a timeout after which the method will return, without sending the result. By default the timeout is NoTimout |
CancellationToken | cancellationToken | Optionally a cancellation token to cancel the wait operation. This is helpful, when for example the imitated service call is an async method. By default no CancellationToken will be used. |
Returns
Type | Description |
---|---|
MessageGateResult<TEnd> | The MessageGateResult<TEnd> of the operation. |
Remarks
For an example how to use this class see the type documentation.
WARNING: Extensive use of this method will lead to time gaps in the execution. See .net issue on github: https://github.com/dotnet/runtime/issues/55562
Use this method only for the legacy service call. Of all other scenarios use
SendAndContinue(TStart, Action<Message>, Action<MessageGateResult<TEnd>>, Int32, CancellationToken)
The method to send a start message and wait for the end message.
Declaration
public void SendAndContinue(TStart startMessage, Action<Message> onMessage, Action<MessageGateResult<TEnd>> continueAction, int timeout = null, CancellationToken cancellationToken = null)
Parameters
Type | Name | Description |
---|---|---|
TStart | startMessage | The start message. |
Action<Message> | onMessage | The action to send the message. |
Action<MessageGateResult<TEnd>> | continueAction | The action to execute once a MessageGateResult<TEnd> was created. |
Int32 | timeout | Optionally a timeout after which the method will return, without sending the result. By default the timeout is NoTimout |
CancellationToken | cancellationToken | Optionally a cancellation token to cancel the continue operation. By default no CancellationToken will be used. |
Remarks
For an example how to use this class see the type documentation.
SendAndContinue(IReadOnlyCollection<TStart>, Action<Message>, Action<MessageAggregationResult<TEnd>>, Int32, CancellationToken)
The method to send the start messages and wait for all end messages.
Declaration
public void SendAndContinue(IReadOnlyCollection<TStart> startMessages, Action<Message> onMessage, Action<MessageAggregationResult<TEnd>> onAggregated, int timeout = null, CancellationToken cancellationToken = null)
Parameters
Type | Name | Description |
---|---|---|
IReadOnlyCollection<TStart> | startMessages | The start messages to send. |
Action<Message> | onMessage | The action to send the message. |
Action<MessageAggregationResult<TEnd>> | onAggregated | The action to execute once a MessageAggregationResult<TEnd> was created. |
Int32 | timeout | Optionally a timeout after which the method will return, without sending the result. By default the timeout is NoTimout |
CancellationToken | cancellationToken | Optionally a cancellation token to cancel the continue operation. By default no CancellationToken will be used. |
Remarks
When a batch of messages is published using the this class they can be united again when all last messages of the execution chain are of the same type or an exception message.
-------------- --------------------- -----------------
| SplitMessage | ----> | IntermediateMessage | -----> | FinishedMessage |
-------------- --------------------- | -----------------
|
| ------------------
*----> | ExceptionMessage |
| ------------------
|
| ------------------
----> | OtherEndMessage |
------------------
Looking at the example above it would not be possible to unite the SplitMessages
again using this class as at least one IntermediateMessage
let to an OtherEndMessage
.
This function is useful when the aggregated end messages need to be modified - for example filtered - before aggregating them. In all other cases it is better to use SendAndAggregate(IReadOnlyCollection<TStart>, Action<Message>) to automatically create and send an aggregated message.
[Consumes(typeof(FinishedMessage))]
[Consumes(typeof(ExceptionMessage))]
[Produces(typeof(StartMessage))]
[Produces(typeof(AggregatedMessage))]
public class MessageAggregatorAgent : Agent
{
private readonly MessageGate<FinishedMessage> gate = new MessageGate<FinishedMessage>();
public MessageAggregatorAgent(IMessageBoard messageBoard) : base(messageBoard)
{
}
protected override void ExecuteCore(Message messageData)
{
if(gate.Check(messageData))
{
return;
}
//create startMessages
gate.SendAndContinue(startMessages, OnMessage, result =>
{
//manipulate the results and produce aggregated message
OnMessage(aggregatedMessage);
});
}
}