• You don’t even know me!
  • I have the rest of my life to find out.

前言,为什么选择研究下载框架

从2020年4月份,自己开始做外销游戏中心模块后,就一直在关注Android平台下载功能的实现思路与方法。优秀的开源下载框架并不多,在GitHub上可以找到以下几个:

  • FileDownloader,是流利说团队开源的一款下载框架,支持多线程、分块、断点续传等功能。已进化为新项目OKDownload,旧的FileDownloader不再维护。
  • OKDownload,同样是流利说团队出品,是FileDownloader的进化版,在原有基础上进行全方位优化,并增加了大量的自动化测试。
  • PRDownloader,是印度的一个开源项目团队MindOrks出品的下载框架。支持多任务、断点续传,不支持分块下载。相比于前面两个流利说的框架,逻辑更为简单清晰,适合进行学习研究。

表面,接入和使用PRDownloader

把自吹自擂的东西抛到一边,一个框架做的好不好,很重要一点是接入它以及使用起来是否方便。接入这点没什么好说的,通过gradle引入即可,目前最新的版本号为0.6.0,尚不是稳定版本。

1
implementation 'com.mindorks.android:prdownloader:0.6.0'

同时在manifest文件里声明访问网络的权限:

1
<uses-permission android:name="android.permission.INTERNET"/>

接下来是使用部分,第一步是在Application.onCreate()中进行初始化,使用默认构造参数即可:

1
PRDownloader.initialize(getApplicationContext());

也可进行定制,定制项为是否使用数据库(支持进程重启后恢复下载)、读取超时、连接超时等:

1
2
3
4
5
6
7
8
9
10
11
12
// Enabling database for resume support even after the application is killed:
PRDownloaderConfig config = PRDownloaderConfig.newBuilder()
.setDatabaseEnabled(true)
.build();
PRDownloader.initialize(getApplicationContext(), config);

// Setting timeout globally for the download network requests:
PRDownloaderConfig config = PRDownloaderConfig.newBuilder()
.setReadTimeout(30_000)
.setConnectTimeout(30_000)
.build();
PRDownloader.initialize(getApplicationContext(), config);

进行完初始化后,就可以在Activity里面调用下载功能了。

启动下载

使用静态方法PRDownloader.download(url, dirPath, fileName)进行下载,会返回一个downloadId,该id用于后续的暂停、恢复、获取状态等操作。在启动下载时,也可设置下载进度、成功、失败监听。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
int downloadId = PRDownloader.download(url, dirPath, fileName)
.build()
.setOnStartOrResumeListener(new OnStartOrResumeListener() {
@Override
public void onStartOrResume() {

}
})
.setOnPauseListener(new OnPauseListener() {
@Override
public void onPause() {

}
})
.setOnCancelListener(new OnCancelListener() {
@Override
public void onCancel() {

}
})
.setOnProgressListener(new OnProgressListener() {
@Override
public void onProgress(Progress progress) {

}
})
.start(new OnDownloadListener() {
@Override
public void onDownloadComplete() {

}

@Override
public void onError(Error error) {

}
});

暂停和恢复下载

1
2
3
4
// 暂停
PRDownloader.pause(downloadId);
// 恢复
PRDownloader.resume(downloadId);

取消下载

1
2
3
4
5
6
// Cancel with the download id
PRDownloader.cancel(downloadId);
// The tag can be set to any request and then can be used to cancel the request
PRDownloader.cancel(TAG);
// Cancel all the requests
PRDownloader.cancelAll();

获取下载状态

1
Status status = PRDownloader.getStatus(downloadId);

清空临时文件

1
2
// Method to clean up temporary resumed files which is older than the given day
PRDownloader.cleanUp(days);

原理部分

初始化部分,PRDownloader.initialize()

PRDownloader维护了一个全局的下载器,在开始下载前需要进行初始化,代码位于PRDownloader.java

1
2
3
4
public static void initialize(Context context, PRDownloaderConfig config) {
ComponentHolder.getInstance().init(context, config);
DownloadRequestQueue.initialize();
}

ComponentHolder明显是一个单例,其中维护了以下变量:

1
2
3
4
5
private int readTimeout; // 网络读取超时
private int connectTimeout; // 网络连接超时
private String userAgent; // UA
private HttpClient httpClient;
private DbHelper dbHelper;

我们可以对这些变量自由配置,或者使用PRDownloaderConfig.newBuilder().build()的默认实现,配置对象位于类PRDownloaderConfig.java当中,基于构建器模式。

1
2
3
4
5
6
7
public static class Builder {
int readTimeout = Constants.DEFAULT_READ_TIMEOUT_IN_MILLS;
int connectTimeout = Constants.DEFAULT_CONNECT_TIMEOUT_IN_MILLS;
String userAgent = Constants.DEFAULT_USER_AGENT;
HttpClient httpClient = new DefaultHttpClient();
boolean databaseEnabled = false;
// 以下省略无关代码

复杂一些的对象是网络HttpClient和数据库DBHelper,下面单独进行解读。

HttpClient

封装了网络请求的具体实现,定义接口HttpClient,并提供了默认实现DefaultHttpClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// HttpClient.java
public interface HttpClient extends Cloneable {

HttpClient clone();

void connect(DownloadRequest request) throws IOException;

int getResponseCode() throws IOException;

InputStream getInputStream() throws IOException;

long getContentLength();

String getResponseHeader(String name);

void close();

Map<String, List<String>> getHeaderFields();

InputStream getErrorStream() throws IOException;

}

默认的实现采用了URLConnection方案,基于这种封装设计,可以无缝切换到OkHttp等实现,出于篇幅考虑,这里不再贴出DefaultHttpClient具体实现。

DBHelper

同样采用了接口设计,主要操作对象为DownloadModel,有CRUD功能,此外还有updateProgress方法,用以更新下载进度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// DBHelper.java
public interface DbHelper {

DownloadModel find(int id);

void insert(DownloadModel model);

void update(DownloadModel model);

void updateProgress(int id, long downloadedBytes, long lastModifiedAt);

void remove(int id);

List<DownloadModel> getUnwantedModels(int days);

void clear();

}

// DownloadModel.java
public class DownloadModel {
private int id; // 唯一id
private String url; // 下载地址
private String eTag; // Response中的ETag字段
private String dirPath; // 下载目录
private String fileName; // 保存的文件名
private long totalBytes; // 总大小
private long downloadedBytes; // 已下载大小,每次同步下载进度时会更新该字段
private long lastModifiedAt; // 更新时间,每次同步下载进度时会更新该字段
// 以下无关getter/setter代码省略
}

以上是PRDownloader默认的全局配置,可以看到除了定义一些网络请求常量外,比较重要的是维护了两个单例:HttpClient和DBHelper。在PRDownloader.initialize()方法里,它还对DownloadRequestQueue进行了初始化,我们继续分析这部分代码。

DownloadRequestQueue

它也是单例实现,是一个全局下载任务队列,其初始化函数调用了构造方法,初始化内部的请求Map、序列号发生器。请求Map用以在全局维护下载任务,Key为下载任务id,Value为下载任务。序列号发生器用来给每个任务生成唯一的序列id,作用是提供任务排序依据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// DownloadRequestQueue.java
public class DownloadRequestQueue {

private static DownloadRequestQueue instance;
private final Map<Integer, DownloadRequest> currentRequestMap;
private final AtomicInteger sequenceGenerator;

private DownloadRequestQueue() {
currentRequestMap = new ConcurrentHashMap<>();
sequenceGenerator = new AtomicInteger();
}

public static void initialize() {
getInstance();
}

public static DownloadRequestQueue getInstance() {
if (instance == null) {
synchronized (DownloadRequestQueue.class) {
if (instance == null) {
instance = new DownloadRequestQueue();
}
}
}
return instance;
}

提交下载任务,PRDownloader.download(url, dirPath, fileName)

完成初始化以后,就可以使用PRDownloader的各项功能,以下载为例探究其内部的实现。

启动下载的入口是PRDownloader.download(url, dirPath, fileName),启动流程为 构建下载任务->设置下载回调->启动下载任务。

构建下载任务

DownloadRequest的构建同样采用构建器模式,入参为下载链接、存储的路径和文件名。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// PRDownloader.java
public static DownloadRequestBuilder download(String url, String dirPath, String fileName) {
return new DownloadRequestBuilder(url, dirPath, fileName);
}

// DownloadRequestBuilder.java
public class DownloadRequestBuilder implements RequestBuilder {

String url;
String dirPath;
String fileName;
Priority priority = Priority.MEDIUM;
Object tag;
int readTimeout;
int connectTimeout;
String userAgent;
HashMap<String, List<String>> headerMap;

public DownloadRequestBuilder(String url, String dirPath, String fileName) {
this.url = url;
this.dirPath = dirPath;
this.fileName = fileName;
}
// 以下省略无关代码

DownloadRequestBuilderDownloadRequest的构建器,主要起作用的属性只有3个:url、dirPath、fileName,其它属性与PRDownloaderConfig类里面的重复了,并没有使用到。

DownloadRequest是一个下载任务的呈现,是一个动态的东西,它跟DownloadModel的区别在于:

  • DownloadModel:是静态的,记录任务在某个时间点的状态(id,地址链接,总大小,已下载大小,更新时间),用于持久化,并且能从持久化数据源里恢复
  • DownloadRequestBuilder:是动态的,不仅包含DownloadModel,还有下载时的各种回调(OnProgressListener、OnDownloadListener、OnStartListener等),此外还有重要的一个字段downloadId,是下载任务的唯一标识符

设置下载回调

下载过程中有多种回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 开始下载回调
public interface OnStartOrResumeListener {
void onStartOrResume();
}
// 暂停回调
public interface OnPauseListener {
void onPause();
}
// 下载取消回调
public interface OnCancelListener {
void onCancel();
}
// 下载完成/失败回调,叫onFinishListener更好
public interface OnDownloadListener {
void onDownloadComplete();
void onError(Error error);
}
// 下载进度回调
public interface OnProgressListener {
void onProgress(Progress progress);
}

可以在DownloadRequest对象上调用setXXXListener()方法,用以设置下载过程中的回调。设置完成之后,回调是如何发生作用的呢?这就到了【开始下载】的逻辑里。

开始下载

调用DownloadRequest.start方法开始下载任务。

1
2
3
4
5
6
7
// DownloadRequest.java
public int start(OnDownloadListener onDownloadListener) {
this.onDownloadListener = onDownloadListener;
downloadId = Utils.getUniqueId(url, dirPath, fileName);
DownloadRequestQueue.getInstance().addRequest(this);
return downloadId;
}

关键的一句命令在DownloadRequestQueue.getInstance().addRequest(this)

1
2
3
4
5
6
7
8
9
10
// DownloadRequestQueue.java
public void addRequest(DownloadRequest request) {
currentRequestMap.put(request.getDownloadId(), request);
request.setStatus(Status.QUEUED);
request.setSequenceNumber(getSequenceNumber());
request.setFuture(Core.getInstance()
.getExecutorSupplier()
.forDownloadTasks()
.submit(new DownloadRunnable(request)));
}

addRequest的过程,是提交下载任务到执行队列的过程。Core.getInstance().getExecutorSupplier().forDownloadTasks()获取到了一个线程池,该线程池用以执行下载任务。

Core.java是单例,持有一个ExecutorSupplier的实现,后者提供三种Executor,分别用于下载、后台任务、UI任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// Core.java
public class Core {

private static Core instance = null;
private final ExecutorSupplier executorSupplier;

private Core() {
this.executorSupplier = new DefaultExecutorSupplier();
}

public static Core getInstance() {
if (instance == null) {
synchronized (Core.class) {
if (instance == null) {
instance = new Core();
}
}
}
return instance;
}

public ExecutorSupplier getExecutorSupplier() {
return executorSupplier;
}

public static void shutDown() {
if (instance != null) {
instance = null;
}
}
}

// ExecutorSupplier.java
public interface ExecutorSupplier {
DownloadExecutor forDownloadTasks();
Executor forBackgroundTasks();
Executor forMainThreadTasks();
}

// DefaultExecutorSupplier.java
public class DefaultExecutorSupplier implements ExecutorSupplier {
// 下载线程池大小,内核数*2+1
private static final int DEFAULT_MAX_NUM_THREADS = 2 * Runtime.getRuntime().availableProcessors() + 1;
// 下载Executor
private final DownloadExecutor networkExecutor;
private final Executor backgroundExecutor;
private final Executor mainThreadExecutor;

DefaultExecutorSupplier() {
ThreadFactory backgroundPriorityThreadFactory = new PriorityThreadFactory(Process.THREAD_PRIORITY_BACKGROUND);
networkExecutor = new DownloadExecutor(DEFAULT_MAX_NUM_THREADS, backgroundPriorityThreadFactory);
// 后台任务单线程Executor
backgroundExecutor = Executors.newSingleThreadExecutor();
// 主线程Executor
mainThreadExecutor = new MainThreadExecutor();
}

@Override
public DownloadExecutor forDownloadTasks() {
return networkExecutor;
}

@Override
public Executor forBackgroundTasks() {
return backgroundExecutor;
}

@Override
public Executor forMainThreadTasks() {
return mainThreadExecutor;
}
}

Executor.submit()返回一个Future对象,将其保存在DownloadRequest中,用来进行取消任务。

具体的下载逻辑,在DownloadRunnable中。

DownloadRunnable,下载具体实现

从名字上就可以看出它是一个用来提交给Executor的Runnable,关键方法run()是耗时操作,阻塞子线程,直至执行完成后通知request中的各个监听者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// DownloadRunnable.java
@Override
public void run() {
request.setStatus(Status.RUNNING);
DownloadTask downloadTask = DownloadTask.create(request);
Response response = downloadTask.run();
if (response.isSuccessful()) {
request.deliverSuccess();
} else if (response.isPaused()) {
request.deliverPauseEvent();
} else if (response.getError() != null) {
request.deliverError(response.getError());
} else if (!response.isCancelled()) {
request.deliverError(new Error());
}
}

// Response.java,包装执行结果字段
public class Response {
private Error error;
private boolean isSuccessful;
private boolean isPaused;
private boolean isCancelled;
// 以下getter/setter代码省略
}

提交任务的逻辑被转交给DownloadTask,它的create是构建方法,新建自身的实例并返回,主要关注它的run()方法,其中执行了下载的真正逻辑,在下面的代码里,我把关键处都加上注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
// 最好加上@WorkerThread注释
Response run() {

Response response = new Response();

if (request.getStatus() == Status.CANCELLED) {
response.setCancelled(true);
return response;
} else if (request.getStatus() == Status.PAUSED) {
response.setPaused(true);
return response;
}

try {
// 进度回调handler,通知UI进度情况
if (request.getOnProgressListener() != null) {
progressHandler = new ProgressHandler(request.getOnProgressListener());
}

tempPath = Utils.getTempPath(request.getDirPath(), request.getFileName());

File file = new File(tempPath);
// 通过downloadId查询历史任务
DownloadModel model = getDownloadModelIfAlreadyPresentInDatabase();
// 如果有历史任务,且之前下载的文件仍然存在,则继续之前的下载
if (model != null) {
if (file.exists()) {
request.setTotalBytes(model.getTotalBytes());
request.setDownloadedBytes(model.getDownloadedBytes());
} else {
// 历史文件被清除了,则删除对应的历史任务
removeNoMoreNeededModelFromDatabase();
request.setDownloadedBytes(0);
request.setTotalBytes(0);
model = null;
}
}
// HttpClient用以连接网络
httpClient = ComponentHolder.getInstance().getHttpClient();

httpClient.connect(request);

if (request.getStatus() == Status.CANCELLED) {
response.setCancelled(true);
return response;
} else if (request.getStatus() == Status.PAUSED) {
response.setPaused(true);
return response;
}
// 重定向(上限10次),读取header中的Location字段作为新的URL
httpClient = Utils.getRedirectedConnectionIfAny(httpClient, request);

responseCode = httpClient.getResponseCode();

eTag = httpClient.getResponseHeader(Constants.ETAG);
// 是历史任务,则先比对etag是否发生变化,若变化则重新下载
if (checkIfFreshStartRequiredAndStart(model)) {
model = null;
}
// Response Code 落在[200,300) 为Successful
if (!isSuccessful()) {
Error error = new Error();
error.setServerError(true);
error.setServerErrorMessage(convertStreamToString(httpClient.getErrorStream()));
error.setHeaderFields(httpClient.getHeaderFields());
error.setResponseCode(responseCode);
response.setError(error);
return response;
}
// 设置任务支持断点续传与否
setResumeSupportedOrNot();
// 若是历史任务,则读取记录过的总大小
totalBytes = request.getTotalBytes();
// 若不支持断点续传则删除历史文件
if (!isResumeSupported) {
deleteTempFile();
}

if (totalBytes == 0) {
totalBytes = httpClient.getContentLength(); // 从Header里读取大小
request.setTotalBytes(totalBytes);
}
// 不支持断点续传的任务,插入数据库也没有,索性不插入
if (isResumeSupported && model == null) {
createAndInsertNewModel();
}

if (request.getStatus() == Status.CANCELLED) {
response.setCancelled(true);
return response;
} else if (request.getStatus() == Status.PAUSED) {
response.setPaused(true);
return response;
}
// 通知外部监听者:我开始下载了
request.deliverStartEvent();
// 接下来开始写文件
inputStream = httpClient.getInputStream();

byte[] buff = new byte[BUFFER_SIZE];
// 文件不存在则创建文件
if (!file.exists()) {
if (file.getParentFile() != null && !file.getParentFile().exists()) {
if (file.getParentFile().mkdirs()) {
//noinspection ResultOfMethodCallIgnored
file.createNewFile();
}
} else {
//noinspection ResultOfMethodCallIgnored
file.createNewFile();
}
}
// 具体实现为RAF
this.outputStream = FileDownloadRandomAccessFile.create(file);
// 因为是RAF,支持随机写
if (isResumeSupported && request.getDownloadedBytes() != 0) {
outputStream.seek(request.getDownloadedBytes());
}
// 先检查任务状态,若暂停或取消,则直接返回
// 在do-while循环里也有此检查
if (request.getStatus() == Status.CANCELLED) {
response.setCancelled(true);
return response;
} else if (request.getStatus() == Status.PAUSED) {
response.setPaused(true);
return response;
}
// do-while循环下载文件
do {

final int byteCount = inputStream.read(buff, 0, BUFFER_SIZE); // BUFFER_SIZE=4Kb

if (byteCount == -1) {
break;
}

outputStream.write(buff, 0, byteCount);

request.setDownloadedBytes(request.getDownloadedBytes() + byteCount);
// ProgressHandler发送下载进度通知给监听者
sendProgress();
// 同时满足两个条件时,更新数据库:1.数据量>65536,2.时间差>2s
syncIfRequired(outputStream);
// 实施检查任务状态,是否取消或暂停
if (request.getStatus() == Status.CANCELLED) {
response.setCancelled(true);
return response;
} else if (request.getStatus() == Status.PAUSED) {
sync(outputStream);
response.setPaused(true);
return response;
}

} while (true);
// 下载完成,重命名文件
final String path = Utils.getPath(request.getDirPath(), request.getFileName());

Utils.renameFileName(tempPath, path);

response.setSuccessful(true);
// 删除数据库多余记录
if (isResumeSupported) {
removeNoMoreNeededModelFromDatabase();
}

} catch (IOException | IllegalAccessException e) {
if (!isResumeSupported) {
deleteTempFile();
}
Error error = new Error();
error.setConnectionError(true);
error.setConnectionException(e);
response.setError(error);
} finally {
closeAllSafely(outputStream);
}

return response;
}

总结,如何做一个极简版本的下载框架

在大众点评的第一年,从事预订模块开发时,第一次从王旭刚口中听到了“MVP”的概念——Minimum Viable Version,最小可行版本。参考PRDownloader,对于一个下载框架而言,兼顾可维护性、可扩展性和概念的独立性,至少应当具备以下特点。

自下而上地看

  • 抽象的下载对象,可持久化,支持从数据库中恢复——对应DownloadModel
  • 执行下载的对象,职责是进行网络连接,并写入文件——对应DownloadTask
  • 下载任务,包含下载进度、状态回调——对应DownloadRequest
  • 全局下载管理器,维护正在下载的任务列表,通常为单例——对应DownloadRequestQueue
  • 线程池,进行前后台任务分发——对应ExecutorSupplier

以上这些部分,对外都要隐藏起来,并统一包装到PRDownloader对象中,以便抽象和解耦。

最后的最后,献上一张PRDownloader的类图(哈哈哈哈,偏不放在文章开头)

PRDownloader
PRDownloader