MessageQueueItemServerFunctions.cs
3.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
using System;
using System.Collections.Generic;
using System.Linq;
using Sungero.Core;
using Sungero.CoreEntities;
using Sungero.ExchangeCore.MessageQueueItem;
namespace Sungero.ExchangeCore.Server
{
partial class MessageQueueItemFunctions
{
/// <summary>
/// Получить корневой элемент очереди.
/// </summary>
/// <returns>Корневой элемент очереди.</returns>
[Public]
public virtual IMessageQueueItem GetRootMessageQueueItem()
{
if (_obj.IsRootMessage == true)
return _obj;
return Sungero.ExchangeCore.MessageQueueItems.GetAll(q => Equals(q.RootBox, _obj.RootBox) &&
string.Equals(q.RootMessageId, _obj.RootMessageId) && q.IsRootMessage == true).FirstOrDefault();
}
/// <summary>
/// Увеличить количество попыток переповтора.
/// </summary>
/// <param name="maxRetriesCount">Максимальное количество переповторов.</param>
[Public]
public virtual void IncrementRetries(int maxRetriesCount)
{
_obj.Retries += 1;
if (_obj.Retries >= maxRetriesCount)
{
_obj.ProcessingStatus = Sungero.ExchangeCore.MessageQueueItem.ProcessingStatus.Suspended;
_obj.Note = "Exceeded maximum count attempts to process message. Message queue item was automatically suspended.";
Sungero.Exchange.PublicFunctions.Module.LogDebugFormat(_obj, string.Format("Exceeded maximum count attempts to process message. Message queue item was automatically suspended. Retries {0}.", _obj.Retries));
}
_obj.Save();
}
/// <summary>
/// Определить, нужно ли прекращать обработку элемента очереди, если сессия загрузки исторических сообщений прекращена.
/// </summary>
/// <returns>True - если нужно ли прекращать обработку элемента очереди, иначе - false.</returns>
[Public]
public virtual bool NeedAbortHistoricalQueueItem()
{
if (_obj.DownloadSession == null)
return false;
var session = Sungero.ExchangeCore.HistoricalMessagesDownloadSessions.Get(_obj.DownloadSession.Id);
if (_obj.IsManualRestart != true && session.DownloadingState == ExchangeCore.HistoricalMessagesDownloadSession.DownloadingState.Aborted)
return true;
return false;
}
/// <summary>
/// Прекратить обработку элемента очереди при загрузке исторического сообщения.
/// </summary>
[Public]
public virtual void AbortHistoricalQueueItem()
{
if (_obj.DownloadSession == null)
return;
var message = "Historical message download session was aborted. Message queue item was automatically suspended";
_obj.ProcessingStatus = Sungero.ExchangeCore.MessageQueueItem.ProcessingStatus.Suspended;
_obj.Note = message;
_obj.Save();
Sungero.Exchange.PublicFunctions.Module.LogDebugFormat(_obj, string.Format(message + " AsyncHandlerId: {0}.", _obj.AsyncHandlerId));
}
/// <summary>
/// Определить, нужно ли увеличить счетчик повторов обработки элемента очереди.
/// </summary>
/// <returns>True - если нужно увеличить счетчик повторов, иначе - false.</returns>
[Public]
public virtual bool NeedIncrementRetries()
{
var counterpartyExternalId = _obj.CounterpartyExternalId;
if (string.IsNullOrEmpty(counterpartyExternalId))
return true;
return Parties.Counterparties.GetAll(c => c.ExchangeBoxes.Any(e => Equals(e.OrganizationId, counterpartyExternalId) && Equals(_obj.RootBox, e.Box))).Any();
}
}
}