如何对一段时间内的流进行分区(GroupBy),并监控Rx中元素是否缺失?前几天我一直在尝试编写一个Rx查询来处理来自源的事件流并检查是否缺少某些ID。缺席被定义为存在一系列时间窗口(例如,从9:00到17:00的所有日期),在此期间最多应该有20分钟没有ID出现在流中。更为复杂的是,缺席时间还要根据身份证件来确定。例如,假设三个事件A、B和C发生在组合事件流(A、A、B、C、A、C、B等)中,可以定义为我认为我需要先划分流以传递GroupBy单独的事件,然后处理生成的具有缺失规则的单独流。我已经在MicrosoftRx论坛上对此进行了一些思考(非常感谢戴夫),我有一些工作代码可以生成规则并进行缺失检查,但我正在努力,例如,如何将其与分组结合起来。所以,不用多说,到目前为止的黑客代码://代表事件的一些样本数据位。公共类FakeData{publicintId{get;放;}publicstringSomeData{get;放;}}//注意DateTime中的Now部分将时钟时间归零并且只有日期。目的是创建开始-结束时间对,例如9:00-17:00。//闹钟开始和结束时间点应该成对匹配,可以是成对的值...varmaxDate=DateTime.Now.Date.AddHours(17).AddMinutes(0).AddSeconds(0).AddDays(14);varstartDate=DateTime.Now.Date.AddHours(9).AddMinutes(0).AddSeconds(0);varalarmStartPeriods=Enumerable.Range(0,1+(maxDate-startDate).Days).Select(d=>newDateTimeOffset(startDate.AddDays(d))).列表();varalarmEndPeriods=Enumerable.Range(0,1+(maxDate-startDate).Days).Select(d=>newDateTimeOffset(startDate.AddDays(d)).AddHours(5)).ToList();并且在不分组的情况下做缺勤查询,这对我来说是一个难点。<编辑:也许我应该将时间点分组并添加一个ID并在查询中使用生成的三元组...dataSource=fromninObservable.Interval(TimeSpan.FromMilliseconds(100))selectnewFakeData{Id=newRandom().Next(1,5),SomeData=DateTimeOffset.Now.ToString()};varstartPointOfTimeChanges=alarmStartPeriods.ToObservable();varendPointOfTimeChanges=alarmEndPeriods.ToObservable();vardurations=startPointOfTimeChanges.CombineLatest(endPointOfTimeChanges开始,结束)=>new{开始,结束});varmaximumInactivityTimeBeforeAlarmSignal=TimeSpan.FromMilliseconds(250);计时器=(从持续时间中的持续时间选择(从Observable.Timer(DateTime.Now)中的_中的dataSource.Throttle(maximumInactivityTimeBeforeAlarmSignal)。TakeUntil(duration.end)选择x))。开关();timer.Subscribe(x=>Debug.WriteLine(x.SomeData));问题:我能想到的任何其他问题都很好(为了我自己的乐趣):):我能想到的另一个选择是显式使用System.Threading.Timers和ConcurrentDictionary,但需要了解更多!关于James键入的答案,这里有一个关于它如何工作以及我打算如何使用它的快速解释。首先,在第一个事件到来之前,可观察对象不会做任何事情。因此,如果要立即开始监视,则需要添加一些额外的Rx功能或触发一个虚拟事件。我相信这不是问题。其次,将从alarmInterval中为任何新ID获取一个新的超时变量。在这里,new甚至是久违的人,已经拉响了警钟。我认为这是可行的,因为人们可以订阅这个observable并产生一些副作用。一些示例可能是设置标志、发送信号以及什么是业务规则。此外,保持适当的锁定等,应该可以很容易地根据预定义的警报规则提供新的时间跨度,并具有单独的缺席和时间窗口。我将不得不研究与此相关的其他概念,以便更好地掌握事物。但我主要关心的是满意度。生活是辉煌的。?编辑-改进代码,简化SelectMany以使用TakeLast。我写了一篇关于检测断开连接的客户端的博客文章-如果您将帖子中的timeToHold变量替换为下面的alarmInterval函数以根据客户端ID获取限制时间,那么这也适用于您的场景。例如://idStream是ID输入流的IObservable//alarmInterval是一个Func,它在给定ID的情况下获取间隔varidAlarmStream=idStream.GroupByUntil(key=>key,grp=>grp.Throttle(alarmInterval(grp.键))).SelectMany(grp=>grp.TakeLast(1));这为您提供了连续监控的基本功能,而无需查看活动监控周期。为了获得监控窗口的功能,我转身并使用WHERE过滤上面的输出,检查发出的ID是否在监控时间窗口内。这使得处理不断变化的监控周期变得更加容易。您可以通过将每个监视窗口转换为流并将其与警报流结合来做更高级的事情,但我不相信额外复杂性的好处。alarmInterval函数还将为您提供动态警报间隔的元素,因为它可以返回新值,但这些只会在该ID的警报失败后生效,从而结束其当前组。--在这里进入一些理论--为了充分激发这一点,你必须以某种方式结束小组--你可以通过几种方式来做到这一点。一种方法是使用Select将idSt投影到包含ID和全局计数器值的自定义类型的流中。为此类型提供适当的相等性实现,以便它可以正确使用GroupByUntil。现在每次更改警报间隔时,更改计数器。这将导致为每个ID创建新组。然后,您可以在最终过滤器中添加额外的检查,以确保输出事件具有最新的计数器值。以上就是C#学习教程:如何对一个流进行分区(GroupBy)并监控一段时间内Rx中元素是否缺失?如果所有分享的内容对你有用,需要进一步了解C#学习教程,希望大家多多关注。本文收集自网络,不代表立场。如涉及侵权,请点击右侧联系管理员删除。如需转载请注明出处:
