22using System . Collections . Concurrent ;
33using System . Collections . Generic ;
44using System . Threading ;
5+ using System . Threading . Tasks ;
56using PCL . Core . Logging ;
67
78namespace PCL . Core . Network ;
89
9- public class Downloader ( int maxParallels = int . MaxValue , int ? refreshInterval = null )
10+ public class Downloader (
11+ int maxParallels = int . MaxValue ,
12+ int ? refreshInterval = null ,
13+ TimeSpan ? timeout = null )
1014{
11- private const string LogModule = "Downloader " ;
15+ private const string LogModule = "Download " ;
1216
1317 private static int _schedulerCount = 0 ;
1418
@@ -18,10 +22,17 @@ public class Downloader(int maxParallels = int.MaxValue, int? refreshInterval =
1822 public static int SchedulerCount => _schedulerCount ;
1923
2024 /// <summary>
21- /// 调度器默认刷新间隔。当没有为一个调度器指定 <see cref="RefreshInterval"/> 时,将使用此全局默认值。
25+ /// 调度器默认刷新间隔。<br/>
26+ /// 当没有为一个调度器指定 <see cref="RefreshInterval"/> 时,将使用此全局默认值。
2227 /// </summary>
2328 public static int DefaultRefreshInterval { get ; set ; } = 500 ;
2429
30+ /// <summary>
31+ /// 任务分片默认超时。若某个分片超时未传递任何数据将会重启该分片,并减少一次重试次数。<br/>
32+ /// 当未指定 <see cref="Timeout"/> 时,将使用此全局默认值。
33+ /// </summary>
34+ public static TimeSpan DefaultTimeout { get ; set ; } = TimeSpan . FromSeconds ( 5 ) ;
35+
2536 /// <summary>
2637 /// 限制全局最大并发数量。该数量由所有实例共享,更改后将于下一个任务分块生效。
2738 /// </summary>
@@ -50,6 +61,29 @@ public bool ClearRefreshInterval()
5061 return true ;
5162 }
5263
64+ private TimeSpan ? _timeout = timeout ;
65+
66+ /// <summary>
67+ /// 任务分片超时。该超时仅对该实例生效,未指定值时,将使用全局默认值 <see cref="DefaultTimeout"/>。<br/>
68+ /// 使用 <see cref="ClearTimeout"/> 可使此值变为未指定状态。
69+ /// </summary>
70+ public TimeSpan Timeout
71+ {
72+ get => _timeout ?? DefaultTimeout ;
73+ set => _timeout = value ;
74+ }
75+
76+ /// <summary>
77+ /// 清除任务分片超时。清除后将回到未指定状态,并使用全局默认值。
78+ /// </summary>
79+ /// <returns>若原值为未指定状态则返回 <c>false</c>,否则返回 <c>true</c></returns>
80+ public bool ClearTimeout ( )
81+ {
82+ if ( _timeout == null ) return false ;
83+ _timeout = null ;
84+ return true ;
85+ }
86+
5387 /// <summary>
5488 /// 限制最大并发数量。该数量仅作用于该实例,更改后将于下一个任务分块生效。
5589 /// </summary>
@@ -60,39 +94,68 @@ public bool ClearRefreshInterval()
6094 /// </summary>
6195 public int SchedulerId { get ; } = ++ _schedulerCount ;
6296
63- private ConcurrentQueue < DownloadItem > _downloadQueue = [ ] ;
97+ /// <summary>
98+ /// 正在进行的下载项。
99+ /// </summary>
100+ public DownloadItem [ ] RunningItems { get ; private set ; } = [ ] ;
101+
102+ private readonly ConcurrentQueue < DownloadItem > _downloadQueue = [ ] ;
64103
65104 private Thread ? _schedulerThread ;
66105
106+ /// <summary>
107+ /// 添加一个下载项。
108+ /// </summary>
109+ /// <param name="item">要添加的下载项</param>
67110 public void AddItem ( DownloadItem item )
68111 {
69- LogWrapper . Trace ( LogModule , $ "#{ SchedulerId } 新增任务 : { item } ") ;
112+ LogWrapper . Trace ( LogModule , $ "#{ SchedulerId } 新增项目 : { item } ") ;
70113 _downloadQueue . Enqueue ( item ) ;
71114 }
115+
116+ private CancellationTokenSource ? _cancelTokenSource ;
117+
118+ /// <summary>
119+ /// </summary>
120+ /// <returns>若调度器未开始运行则返回 <c>false</c>,否则返回 <c>true</c></returns>
121+ public bool Cancel ( )
122+ {
123+ if ( _cancelTokenSource == null ) return false ;
124+ _cancelTokenSource . Cancel ( ) ;
125+ return true ;
126+ }
72127
73128 private int _currentParallelCount = 0 ;
74129
75130 private bool _CanStartNewParallel =>
76131 _currentParallelCount < MaxParallels && _currentParallelCount < ParallelTaskLimit ;
77132
133+ /// <summary>
134+ /// 启动调度器
135+ /// </summary>
136+ /// <param name="cancelToken">取消操作通知</param>
137+ /// <exception cref="InvalidOperationException"></exception>
78138 public void Start ( CancellationToken cancelToken = default )
79139 {
80140 if ( _schedulerThread != null ) throw new InvalidOperationException ( "Cannot start twice" ) ;
141+ _cancelTokenSource = CancellationTokenSource . CreateLinkedTokenSource ( cancelToken ) ;
142+ var cToken = _cancelTokenSource . Token ;
81143 _schedulerThread = new Thread ( ( ) =>
82144 {
83145 var dequeued = new List < DownloadItem > ( ) ;
84- while ( ! cancelToken . IsCancellationRequested )
146+ while ( ! cToken . IsCancellationRequested )
85147 {
86148 if ( _CanStartNewParallel && _downloadQueue . TryDequeue ( out var item ) )
87149 {
150+ if ( item . Status is DownloadItemStatus . Cancelled or DownloadItemStatus . Success ) continue ;
88151 dequeued . Add ( item ) ;
89- if ( item . Status == DownloadItemStatus . Waiting ) _StartNewItem ( item ) ;
90- // TODO more processing
152+ if ( item . Status is not DownloadItemStatus . Starting ) _StartNewParallel ( item , cToken ) ;
91153 }
92154 else
93155 {
94- if ( cancelToken . IsCancellationRequested ) break ;
95- Thread . Sleep ( RefreshInterval ) ;
156+ try { Task . Delay ( RefreshInterval , cToken ) . Wait ( cToken ) ; }
157+ catch ( Exception ) { break ; }
158+ RunningItems = dequeued . ToArray ( ) ;
96159 dequeued . ForEach ( i => _downloadQueue . Enqueue ( i ) ) ;
97160 dequeued . Clear ( ) ;
98161 }
@@ -104,8 +167,62 @@ public void Start(CancellationToken cancelToken = default)
104167 _schedulerThread . Start ( ) ;
105168 }
106169
107- private void _StartNewItem ( DownloadItem item )
170+ private void _RunParallelTask ( Task task )
108171 {
109- throw new NotImplementedException ( ) ;
172+ _currentParallelCount ++ ;
173+ // TODO limit threads
174+ Task . Run ( async ( ) =>
175+ {
176+ try { await task ; }
177+ catch ( Exception ex ) { LogWrapper . Error ( ex , LogModule , "下载任务执行出错" ) ; }
178+ finally { _currentParallelCount -- ; }
179+ } ) ;
180+ }
181+
182+ private void _StartNewParallel ( DownloadItem item , CancellationToken cToken )
183+ {
184+ Task ? task = null ;
185+ if ( item . Status == DownloadItemStatus . Waiting )
186+ {
187+ // 开始第一个分片 (也可能只有这一个 即不分片)
188+ task = item . NewSegment ( 0 , null , ErrorCallback ) ;
189+ }
190+ else for (
191+ var currentNode = item . Segments . First ;
192+ currentNode != null && ! cToken . IsCancellationRequested ;
193+ currentNode = currentNode . Next )
194+ {
195+ // 分析正在运行的分片
196+ var seg = currentNode . Value ;
197+ if ( seg . Status != DownloadSegmentStatus . Running ) continue ;
198+ if ( seg . CurrentChunkStartTime - DateTime . Now > Timeout )
199+ {
200+ // 超时重试
201+ task = item . RestartSegment ( currentNode , true ) ;
202+ break ;
203+ }
204+ // 估计下载速度 防止重复下载同样的内容
205+ var transferLengthByTimeout = seg . ChunkSize / seg . LastChunkElapsedTime . Ticks * Timeout . Ticks ;
206+ if ( seg . RemainingLength <= transferLengthByTimeout ) continue ;
207+ // 开始新的分片
208+ var start = ( seg . NextPosition + seg . EndPosition ) / 2 ;
209+ var end = seg . EndPosition ;
210+ task = item . NewSegment ( start , end , ErrorCallback , currentNode ) ;
211+ break ;
212+ }
213+ if ( task != null ) _RunParallelTask ( task ) ;
214+ return ;
215+
216+ void ErrorCallback ( DownloadSegmentStatus status , Exception ? lastException )
217+ {
218+ if ( status == DownloadSegmentStatus . FailedNotSupportRange )
219+ {
220+ item . TrySegment = false ;
221+ return ;
222+ }
223+ var statusCode = ( int ) status ;
224+ LogWrapper . Warn ( lastException , LogModule , $ "下载失败 ({ statusCode } ): { item } ") ;
225+ item . Cancel ( true ) ;
226+ }
110227 }
111228}
0 commit comments