using System; using System.Collections.Concurrent; using System.Linq; using System.Threading; namespace FastGithub.FlowAnalyze { sealed class FlowAnalyzer : IFlowAnalyzer { private const int INTERVAL_SECONDS = 5; private readonly FlowQueues readQueues = new(INTERVAL_SECONDS); private readonly FlowQueues writeQueues = new(INTERVAL_SECONDS); /// /// 收到数据 /// /// /// public void OnFlow(FlowType flowType, int length) { if (flowType == FlowType.Read) { this.readQueues.OnFlow(length); } else { this.writeQueues.OnFlow(length); } } /// /// 获取流量分析 /// /// public FlowStatistics GetFlowStatistics() { return new FlowStatistics { TotalRead = this.readQueues.TotalBytes, TotalWrite = this.writeQueues.TotalBytes, ReadRate = this.readQueues.GetRate(), WriteRate = this.writeQueues.GetRate() }; } private class FlowQueues { private int cleaning = 0; private long totalBytes = 0L; private record QueueItem(long Ticks, int Length); private readonly ConcurrentQueue queues = new(); private readonly int intervalSeconds; public long TotalBytes => this.totalBytes; public FlowQueues(int intervalSeconds) { this.intervalSeconds = intervalSeconds; } public void OnFlow(int length) { Interlocked.Add(ref this.totalBytes, length); this.CleanInvalidRecords(); this.queues.Enqueue(new QueueItem(Environment.TickCount64, length)); } public double GetRate() { this.CleanInvalidRecords(); return (double)this.queues.Sum(item => item.Length) / this.intervalSeconds; } /// /// 清除无效记录 /// /// private bool CleanInvalidRecords() { if (Interlocked.CompareExchange(ref this.cleaning, 1, 0) != 0) { return false; } var ticks = Environment.TickCount64; while (this.queues.TryPeek(out var item)) { if (ticks - item.Ticks < this.intervalSeconds * 1000) { break; } else { this.queues.TryDequeue(out _); } } Interlocked.Exchange(ref this.cleaning, 0); return true; } } } }