Skip to content
This repository was archived by the owner on Jan 25, 2026. It is now read-only.

Commit 98c236f

Browse files
committed
feat(download): 完成下载调度框架
1 parent 3da6500 commit 98c236f

4 files changed

Lines changed: 170 additions & 44 deletions

File tree

LifecycleManagement/Lifecycle.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ void TempHandler(LifecycleState s)
375375
/// </returns>
376376
public static Task<bool> WaitForStateAsync(LifecycleState state)
377377
{
378-
if (CurrentState >= state) return Task.FromResult(false); // 如果已经是目标状态,则直接返回 true
378+
if (CurrentState >= state) return Task.FromResult(false); // 如果已经是目标状态,则直接返回 false
379379
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
380380
StateChanged += TempHandler;
381381
return tcs.Task;

Network/DownloadItem.cs

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -54,30 +54,40 @@ public class DownloadItem(
5454

5555
public DownloadItemStatus Status { get; private set; } = DownloadItemStatus.Waiting;
5656

57+
public event Action? Finished;
58+
5759
public long CalculateTransferredLength()
5860
{
5961
lock (Segments) return Segments
6062
.Aggregate(0L, (len, task) => len + task.TransferredLength);
6163
}
6264

6365
public long CalculateRemainingLength() => (ContentLength == 0) ? 0 : ContentLength - CalculateTransferredLength();
66+
67+
private CancellationTokenSource _cancelTokenSource = new();
68+
69+
public bool Cancel(bool markAsFailed = false)
70+
{
71+
if (Status is not DownloadItemStatus.Starting and DownloadItemStatus.Running) return false;
72+
_cancelTokenSource.Cancel();
73+
if (markAsFailed) Status = DownloadItemStatus.Failed;
74+
return true;
75+
}
6476

6577
private int _finishedCount = 0;
6678

6779
private void _ConstructDownloadSegment(
6880
long startPosition,
69-
long endPosition,
81+
long? endPosition,
7082
Action<DownloadSegment>? endCallback,
7183
CancellationToken cancelToken,
84+
int retry,
7285
out Task task,
7386
out DownloadSegment segment)
7487
{
75-
var seg = new DownloadSegment(RealUri, TargetPath, ChunkSize)
76-
{
77-
EndPosition = endPosition,
78-
RetryCount = Retry,
79-
};
80-
if (startPosition == 0 && endPosition != 0)
88+
var seg = new DownloadSegment(RealUri, TargetPath, ChunkSize) { RetryCount = retry };
89+
if (endPosition is { } end) seg.EndPosition = end;
90+
if (startPosition == 0 && endPosition == 0)
8191
{
8292
seg.When(DownloadSegmentStatus.Running, () =>
8393
{
@@ -93,14 +103,17 @@ private void _ConstructDownloadSegment(
93103

94104
public async Task NewSegment(
95105
long startPosition,
96-
long endPosition,
97-
Action finishedCallback,
106+
long? endPosition,
98107
SegmentInterruptHandler errorCallback,
99-
LinkedListNode<DownloadSegment>? afterNode = null,
100-
CancellationToken cancelToken = default)
108+
LinkedListNode<DownloadSegment>? afterNode = null)
101109
{
102-
if (Segments.Count == 0) Status = DownloadItemStatus.Starting;
103-
cancelToken.Register(() => Status = DownloadItemStatus.Cancelled);
110+
var cToken = _cancelTokenSource.Token;
111+
if (Segments.Count == 0)
112+
{
113+
// 初始化
114+
Status = DownloadItemStatus.Starting;
115+
cToken.Register(() => Status = DownloadItemStatus.Cancelled);
116+
}
104117
Task task;
105118
lock (Segments)
106119
{
@@ -112,14 +125,14 @@ public async Task NewSegment(
112125
_finishedCount++;
113126
if (_finishedCount != Segments.Count) return;
114127
Status = DownloadItemStatus.Success;
115-
finishedCallback();
128+
Finished?.Invoke();
116129
}
117130
else if (seg.Status != DownloadSegmentStatus.Cancelled)
118131
{
119132
errorCallback(seg.Status, seg.LastException);
120133
}
121134
}),
122-
cancelToken, out task, out var segment);
135+
cToken, Retry, out task, out var segment);
123136
if (afterNode != null)
124137
{
125138
Segments.AddAfter(afterNode, segment);
@@ -130,7 +143,9 @@ public async Task NewSegment(
130143
await task;
131144
}
132145

133-
public async Task RestartSegment(LinkedListNode<DownloadSegment> node, CancellationToken cancelToken = default)
146+
public async Task RestartSegment(
147+
LinkedListNode<DownloadSegment> node,
148+
bool isRetry = false)
134149
{
135150
Task task;
136151
lock (Segments)
@@ -139,14 +154,11 @@ public async Task RestartSegment(LinkedListNode<DownloadSegment> node, Cancellat
139154
segment.Cancel();
140155
_ConstructDownloadSegment(
141156
segment.StartPosition, segment.EndPosition, segment.EndCallback,
142-
cancelToken, out task, out segment);
157+
_cancelTokenSource.Token, segment.RetryCount, out task, out segment);
143158
node.Value = segment;
144159
}
145160
await task;
146161
}
147162

148-
public override string ToString()
149-
{
150-
return $"[{SourceUri}] -> [{TargetPath}]";
151-
}
163+
public override string ToString() => $"[{SourceUri}] -> [{TargetPath}]";
152164
}

Network/DownloadSegment.cs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,9 @@ public class DownloadSegment(Uri sourceUri, string targetPath, int chunkSize = 1
119119
public int RetryCount { get; set; } = 3;
120120

121121
/// <summary>
122-
/// 传输总长度。
122+
/// 传输总长度。若来源不支持内容长度,则为 0。
123123
/// </summary>
124-
public long TotalLength => EndPosition - StartPosition + 1;
124+
public long TotalLength => (EndPosition == 0) ? 0 : EndPosition - StartPosition + 1;
125125

126126
/// <summary>
127127
/// 已传输的长度。
@@ -239,7 +239,7 @@ public async Task Start(bool enableRange, long startPosition = 0, CancellationTo
239239
if (EndPosition == 0)
240240
{
241241
var totalRead = 0L;
242-
while (true)
242+
while (!cToken.IsCancellationRequested)
243243
{
244244
var buffer = new byte[ChunkSize];
245245
CurrentChunkStartTime = DateTime.Now;
@@ -256,7 +256,7 @@ public async Task Start(bool enableRange, long startPosition = 0, CancellationTo
256256
else
257257
{
258258
long remaining;
259-
while ((remaining = RemainingLength) > 0)
259+
while (!cToken.IsCancellationRequested && (remaining = RemainingLength) > 0)
260260
{
261261
var count = remaining > ChunkSize ? ChunkSize : (int)remaining;
262262
var buffer = new byte[count];
@@ -270,13 +270,13 @@ public async Task Start(bool enableRange, long startPosition = 0, CancellationTo
270270
}
271271
}
272272
await stream.FlushAsync(cToken);
273-
Status = DownloadSegmentStatus.Success;
273+
if (!cToken.IsCancellationRequested) Status = DownloadSegmentStatus.Success;
274274
break;
275275
}
276276
catch (Exception e)
277277
{
278278
LastException = e;
279-
if (--RetryCount == 0) Status = (DownloadSegmentStatus)failedStatus;
279+
if (--RetryCount == 0) Status = (DownloadSegmentStatus)(failedStatus);
280280
else await Task.Delay(1000, cToken);
281281
}
282282
}
@@ -294,8 +294,5 @@ public bool Cancel()
294294
return true;
295295
}
296296

297-
public override string ToString()
298-
{
299-
return $"[{SourceUri}]({StartPosition}..{EndPosition}) -> [{TargetPath}]";
300-
}
297+
public override string ToString() => $"[{SourceUri}]({StartPosition}..{EndPosition}) -> [{TargetPath}]";
301298
}

Network/Downloader.cs

Lines changed: 129 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,17 @@
22
using System.Collections.Concurrent;
33
using System.Collections.Generic;
44
using System.Threading;
5+
using System.Threading.Tasks;
56
using PCL.Core.Logging;
67

78
namespace PCL.Core.Network;
89

9-
public class Downloader(int maxParallels = int.MaxValue, int? refreshInterval = null)
10+
public class Downloader(
11+
int maxParallels = int.MaxValue,
12+
int? refreshInterval = null,
13+
TimeSpan? timeout = null)
1014
{
11-
private const string LogModule = "Downloader";
15+
private const string LogModule = "Download";
1216

1317
private static int _schedulerCount = 0;
1418

@@ -18,10 +22,17 @@ public class Downloader(int maxParallels = int.MaxValue, int? refreshInterval =
1822
public static int SchedulerCount => _schedulerCount;
1923

2024
/// <summary>
21-
/// 调度器默认刷新间隔。当没有为一个调度器指定 <see cref="RefreshInterval"/> 时,将使用此全局默认值。
25+
/// 调度器默认刷新间隔。<br/>
26+
/// 当没有为一个调度器指定 <see cref="RefreshInterval"/> 时,将使用此全局默认值。
2227
/// </summary>
2328
public static int DefaultRefreshInterval { get; set; } = 500;
2429

30+
/// <summary>
31+
/// 任务分片默认超时。若某个分片超时未传递任何数据将会重启该分片,并减少一次重试次数。<br/>
32+
/// 当未指定 <see cref="Timeout"/> 时,将使用此全局默认值。
33+
/// </summary>
34+
public static TimeSpan DefaultTimeout { get; set; } = TimeSpan.FromSeconds(5);
35+
2536
/// <summary>
2637
/// 限制全局最大并发数量。该数量由所有实例共享,更改后将于下一个任务分块生效。
2738
/// </summary>
@@ -50,6 +61,29 @@ public bool ClearRefreshInterval()
5061
return true;
5162
}
5263

64+
private TimeSpan? _timeout = timeout;
65+
66+
/// <summary>
67+
/// 任务分片超时。该超时仅对该实例生效,未指定值时,将使用全局默认值 <see cref="DefaultTimeout"/>。<br/>
68+
/// 使用 <see cref="ClearTimeout"/> 可使此值变为未指定状态。
69+
/// </summary>
70+
public TimeSpan Timeout
71+
{
72+
get => _timeout ?? DefaultTimeout;
73+
set => _timeout = value;
74+
}
75+
76+
/// <summary>
77+
/// 清除任务分片超时。清除后将回到未指定状态,并使用全局默认值。
78+
/// </summary>
79+
/// <returns>若原值为未指定状态则返回 <c>false</c>,否则返回 <c>true</c></returns>
80+
public bool ClearTimeout()
81+
{
82+
if (_timeout == null) return false;
83+
_timeout = null;
84+
return true;
85+
}
86+
5387
/// <summary>
5488
/// 限制最大并发数量。该数量仅作用于该实例,更改后将于下一个任务分块生效。
5589
/// </summary>
@@ -60,39 +94,68 @@ public bool ClearRefreshInterval()
6094
/// </summary>
6195
public int SchedulerId { get; } = ++_schedulerCount;
6296

63-
private ConcurrentQueue<DownloadItem> _downloadQueue = [];
97+
/// <summary>
98+
/// 正在进行的下载项。
99+
/// </summary>
100+
public DownloadItem[] RunningItems { get; private set; } = [];
101+
102+
private readonly ConcurrentQueue<DownloadItem> _downloadQueue = [];
64103

65104
private Thread? _schedulerThread;
66105

106+
/// <summary>
107+
/// 添加一个下载项。
108+
/// </summary>
109+
/// <param name="item">要添加的下载项</param>
67110
public void AddItem(DownloadItem item)
68111
{
69-
LogWrapper.Trace(LogModule, $"#{SchedulerId} 新增任务: {item}");
112+
LogWrapper.Trace(LogModule, $"#{SchedulerId} 新增项目: {item}");
70113
_downloadQueue.Enqueue(item);
71114
}
115+
116+
private CancellationTokenSource? _cancelTokenSource;
117+
118+
/// <summary>
119+
/// </summary>
120+
/// <returns>若调度器未开始运行则返回 <c>false</c>,否则返回 <c>true</c></returns>
121+
public bool Cancel()
122+
{
123+
if (_cancelTokenSource == null) return false;
124+
_cancelTokenSource.Cancel();
125+
return true;
126+
}
72127

73128
private int _currentParallelCount = 0;
74129

75130
private bool _CanStartNewParallel =>
76131
_currentParallelCount < MaxParallels && _currentParallelCount < ParallelTaskLimit;
77132

133+
/// <summary>
134+
/// 启动调度器
135+
/// </summary>
136+
/// <param name="cancelToken">取消操作通知</param>
137+
/// <exception cref="InvalidOperationException"></exception>
78138
public void Start(CancellationToken cancelToken = default)
79139
{
80140
if (_schedulerThread != null) throw new InvalidOperationException("Cannot start twice");
141+
_cancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancelToken);
142+
var cToken = _cancelTokenSource.Token;
81143
_schedulerThread = new Thread(() =>
82144
{
83145
var dequeued = new List<DownloadItem>();
84-
while (!cancelToken.IsCancellationRequested)
146+
while (!cToken.IsCancellationRequested)
85147
{
86148
if (_CanStartNewParallel && _downloadQueue.TryDequeue(out var item))
87149
{
150+
if (item.Status is DownloadItemStatus.Cancelled or DownloadItemStatus.Success) continue;
88151
dequeued.Add(item);
89-
if (item.Status == DownloadItemStatus.Waiting) _StartNewItem(item);
90-
// TODO more processing
152+
if (item.Status is not DownloadItemStatus.Starting) _StartNewParallel(item, cToken);
91153
}
92154
else
93155
{
94-
if (cancelToken.IsCancellationRequested) break;
95-
Thread.Sleep(RefreshInterval);
156+
try { Task.Delay(RefreshInterval, cToken).Wait(cToken); }
157+
catch (Exception) { break; }
158+
RunningItems = dequeued.ToArray();
96159
dequeued.ForEach(i => _downloadQueue.Enqueue(i));
97160
dequeued.Clear();
98161
}
@@ -104,8 +167,62 @@ public void Start(CancellationToken cancelToken = default)
104167
_schedulerThread.Start();
105168
}
106169

107-
private void _StartNewItem(DownloadItem item)
170+
private void _RunParallelTask(Task task)
108171
{
109-
throw new NotImplementedException();
172+
_currentParallelCount++;
173+
// TODO limit threads
174+
Task.Run(async () =>
175+
{
176+
try { await task; }
177+
catch (Exception ex) { LogWrapper.Error(ex, LogModule, "下载任务执行出错"); }
178+
finally { _currentParallelCount--; }
179+
});
180+
}
181+
182+
private void _StartNewParallel(DownloadItem item, CancellationToken cToken)
183+
{
184+
Task? task = null;
185+
if (item.Status == DownloadItemStatus.Waiting)
186+
{
187+
// 开始第一个分片 (也可能只有这一个 即不分片)
188+
task = item.NewSegment(0, null, ErrorCallback);
189+
}
190+
else for (
191+
var currentNode = item.Segments.First;
192+
currentNode != null && !cToken.IsCancellationRequested;
193+
currentNode = currentNode.Next)
194+
{
195+
// 分析正在运行的分片
196+
var seg = currentNode.Value;
197+
if (seg.Status != DownloadSegmentStatus.Running) continue;
198+
if (seg.CurrentChunkStartTime - DateTime.Now > Timeout)
199+
{
200+
// 超时重试
201+
task = item.RestartSegment(currentNode, true);
202+
break;
203+
}
204+
// 估计下载速度 防止重复下载同样的内容
205+
var transferLengthByTimeout = seg.ChunkSize / seg.LastChunkElapsedTime.Ticks * Timeout.Ticks;
206+
if (seg.RemainingLength <= transferLengthByTimeout) continue;
207+
// 开始新的分片
208+
var start = (seg.NextPosition + seg.EndPosition) / 2;
209+
var end = seg.EndPosition;
210+
task = item.NewSegment(start, end, ErrorCallback, currentNode);
211+
break;
212+
}
213+
if (task != null) _RunParallelTask(task);
214+
return;
215+
216+
void ErrorCallback(DownloadSegmentStatus status, Exception? lastException)
217+
{
218+
if (status == DownloadSegmentStatus.FailedNotSupportRange)
219+
{
220+
item.TrySegment = false;
221+
return;
222+
}
223+
var statusCode = (int)status;
224+
LogWrapper.Warn(lastException, LogModule, $"下载失败 ({statusCode}): {item}");
225+
item.Cancel(true);
226+
}
110227
}
111228
}

0 commit comments

Comments
 (0)