5月17日,星期日。今天是第#413期Android Weekly的学习笔记。

Concurrency Frameworks in Android are Overrated

Why Frameworks?
Why Frameworks?

https://www.techyourchance.com/concurrency-frameworks-overrated-android/

今天的首发文章质量不错,Vasiliy抛出一个观点“Android下的并发框架都是过度设计的,实际上你可以不借助任何框架写出干净的并发代码”,并通过样例代码佐证自己的这一观点。

需求简述

模拟一个合并文件并上传的过程,文件操作和网络操作必须在工作线程进行,并在主线程通知UI操作结果。

  1. 把两组文件分别进行合并
  2. 把合并结果进行压缩
  3. 上传压缩后的zip包到服务器
  4. 通知操作结果

只用原生的并发原语实现

第一个版本没有考虑太多性能,在工作线程顺序执行耗时操作,并通过Handler通知UI线程。在最终一步网络操作的回调里,通知执行结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class UploadFilesUseCase extends BaseObservable<UploadFilesUseCase.Listener> {

public interface Listener {
void onFilesUploaded();
void onFilesUploadFailed();
}

private final Handler uiHandler = new Handler(Looper.getMainLooper());

public void uploadFiles() {
new Thread(() -> uploadFilesSync()).start();
}

@WorkerThread
private void uploadFilesSync() {
File mergedA = processAndMergeFilesOfTypeA();
File mergedB = processAndMergeFilesOfTypeB();
File archive = compressMergedFiles(mergedA, mergedB);

HttpManager.getInstance.uploadFiles(
archive,
new HttpRequestListener() {
@Override
public void onDone(int code, byte[] body) {
if (code / 100 == 2) {
notifySuccess();
} else {
notifyFailure();
}
}

@Override
public void onFailure() {
notifyFailure();
}
}
);
}

@WorkerThread
private File processAndMergeFilesOfTypeA() { ... }

@WorkerThread
private File processAndMergeFilesOfTypeB() { ... }

@WorkerThread
private File compressMergedFiles(File fileA, File fileB) { ... }

private void notifySuccess() {
uiHandler.post(() -> {
for (Listener listener : getListeners()) {
listener.onFilesUploaded();
}
});
}

private void notifyFailure() {
uiHandler.post(() -> {
for (Listener listener : getListeners()) {
listener.onFilesUploadFailed();
}
});
}

}

增加异常和重试处理

如果合并文件、压缩文件过程中可能抛出异常,如下函数签名所示:

1
2
3
4
5
6
7
8
@WorkerThread
private File processAndMergeFilesOfTypeA() throws OperationFailedException { ... }

@WorkerThread
private File processAndMergeFilesOfTypeB() throws OperationFailedException { ... }

@WorkerThread
private File compressMergedFiles(File fileA, File fileB) throws OperationFailedException { ... }

则通过try...catch进行捕获,并通知失败。

1
2
3
4
5
6
7
8
try {
File mergedA = processAndMergeFilesOfTypeA();
File mergedB = processAndMergeFilesOfTypeB();
archive = compressMergedFiles(mergedA, mergedB);
} catch (OperationFailedException e) {
notifyFailure();
return;
}

然后我们加上重试机制,当前已经重试的次数作为参数传入uploadFilesSync函数,并把原来notifyFailuer处均替换为retryOrFail(retryCount)调用,其实现如下:

1
2
3
4
5
6
7
8
@WorkerThread
private void retryOrFail(int currentRetryCount) {
if (currentRetryCount >= MAX_RETRIES - 1) {
notifyFailure();
} else {
uploadFilesSync(currentRetryCount + 1); // 重试次数+1
}
}

并发运行优化效率

注意到两处merge文件的操作是可以并行处理的,因此在工作线程之外分起两个线程,并通过CountDownLatch(2)来等待两个线程处理完成。由于涉及并发赋值操作,必须使用AtomicReference保证赋值操作的原子性!

这段代码实现太优美了!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@WorkerThread
private void uploadFilesSync(int retryCount) {

final AtomicReference<File> mergedA = new AtomicReference<>(null);
final AtomicReference<File> mergedB = new AtomicReference<>(null);

final CountDownLatch countDownLatch = new CountDownLatch(2);

new Thread(() -> {
try {
mergedA.set(processAndMergeFilesOfTypeA());
} catch (OperationFailedException e) {
// log the exception
} finally {
countDownLatch.countDown();
}
}).start();

new Thread(() -> {
try {
mergedB.set(processAndMergeFilesOfTypeB());
} catch (OperationFailedException e) {
// log the exception
} finally {
countDownLatch.countDown();
}
}).start();

try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException("unexpected interrupt");
}

if (mergedA.get() == null || mergedB.get() == null) {
retryOrFail(retryCount);
return;
}
// 后续是合并&上传的代码,省略

将异步的网络操作同步化

作者在文中提到,由于在接手开发时,已经有了基于回调的网络框架,因此在以上代码里均使用了回调方式处理网络结果。如果回调过多的话,会产生“回调地狱”,因此,进一步优化,将异步通过CountDownLatch(1)转化为同步方法。返回值为int类型,并且会抛出运行时异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@WorkerThread
private int uploadFileToServer(File archive) {
final AtomicInteger responseCode = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(1);
HttpManager.getInstance.uploadFiles(
archive,
new HttpRequestListener() {
@Override
public void onDone(int code, byte[] body) {
responseCode.set(code);
countDownLatch.countDown();
}
@Override
public void onFailure() {
responseCode.set(0);
countDownLatch.countDown();
}
}
);
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException("unexpected interrupt");
}
return responseCode.get();
}

防止并发调用,避免进入异常状态

尽管上述的业务需求都已经实现,对于这种场景,还有一种隐形的技术考虑:不可以在同一时间将同一份数据上传多次,这会导致意料之外的bug,严重的话甚至会损坏服务器的数据。

通过BaseBusyObservable,提供了一个AtomicBoolean类型的标志位,说明当前任务的执行状态,并且在开始上传文件时申请进入busy,在上传结束时释放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
public class UploadFilesUseCase extends BaseBusyObservable<UploadFilesUseCase.Listener> {

public interface Listener {
void onFilesUploaded();
void onFilesUploadFailed();
}

private final static int MAX_RETRIES = 3;

private final Handler uiHandler = new Handler(Looper.getMainLooper());

public void uploadFiles() {
if (!isFreeAndBecomeBusy()) { // 申请进入busy
// log concurrent invocation attempt
return;
}
new Thread(() -> uploadFilesSync(0)).start();
}

@WorkerThread
private void uploadFilesSync(int retryCount) {
final AtomicReference<File> mergedA = new AtomicReference<>(null);
final AtomicReference<File> mergedB = new AtomicReference<>(null);

final CountDownLatch countDownLatch = new CountDownLatch(2);

new Thread(() -> {
try {
mergedA.set(processAndMergeFilesOfTypeA());
} catch (OperationFailedException e) {
// log the exception
} finally {
countDownLatch.countDown();
}
}).start();

new Thread(() -> {
try {
mergedB.set(processAndMergeFilesOfTypeB());
} catch (OperationFailedException e) {
// log the exception
} finally {
countDownLatch.countDown();
}
}).start();

try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException("unexpected interrupt");
}

if (mergedA.get() == null || mergedB.get() == null) {
retryOrFail(retryCount);
return;
}

File archive;
try {
archive = compressMergedFiles(mergedA.get(), mergedB.get());
} catch (OperationFailedException e) {
retryOrFail(retryCount);
return;
}

int responseCode = uploadFileToServer(archive);

if (responseCode / 100 == 2) {
deleteTempDir();
notifySuccess();
} else {
retryOrFail(retryCount);
}
}

@WorkerThread
private int uploadFileToServer(File archive) {
final AtomicInteger responseCode = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(1);

HttpManager.getInstance.uploadFiles(
archive,
new HttpRequestListener() {
@Override
public void onDone(int code, byte[] body) {
responseCode.set(code);
countDownLatch.countDown();
}

@Override
public void onFailure() {
responseCode.set(0);
countDownLatch.countDown();
}
}
);

try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException("unexpected interrupt");
}

return responseCode.get();
}

@WorkerThread
private void retryOrFail(int currentRetryCount) {
deleteTempDir();
if (currentRetryCount >= MAX_RETRIES - 1) {
notifyFailure();
return;
}
uploadFilesSync(currentRetryCount + 1);
}

@WorkerThread
private File processAndMergeFilesOfTypeA() throws OperationFailedException { ... }

@WorkerThread
private File processAndMergeFilesOfTypeB() throws OperationFailedException { ... }

@WorkerThread
private File compressMergedFiles(File fileA, File fileB) throws OperationFailedException { ... }

@WorkerThread
private void deleteTempDir() { ... }

private void notifySuccess() {
uiHandler.post(() -> {
for (Listener listener : getListeners()) {
listener.onFilesUploaded();
}
becomeNotBusy(); // 申请离开busy
});
}

private void notifyFailure() {
uiHandler.post(() -> {
for (Listener listener : getListeners()) {
listener.onFilesUploadFailed();
}
becomeNotBusy(); // 申请离开busy
});
}

}

以下分别是协程实现、Rxjava实现,很难说它们孰优孰劣,但毫无疑问的是,这两个框架都需要一些学习成本,并且代码在理解起来不如并发原语看起来清晰。在作者Vasiliy看来,他会避免把自己的项目与任何第三方框架进行耦合。以及,RxJava已经度过了它的巅峰期。

Coroutines 实现

https://gist.github.com/ATizik/0431c0313d3d0596de3ce9a0fc82b29f

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class UploadFilesUsecase() : BaseObservable<UploadFilesUsecase.Listener>() {
interface Listener {
fun onFilesUploaded()
fun onFilesUploadFailed()
}

private val MAX_RETRIES = 3
private val mutex = Mutex()
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)


fun uploadFiles():Boolean = mutex.tryWithLock {
scope.launch {
repeat(MAX_RETRIES) { retryCount ->
try {
val files = listOf(
async { processAndMergeFilesOfTypeA() },
async { processAndMergeFilesOfTypeB() })
.map { it.await() }

val archive = compressMergedFiles(files)

uploadFileToServer(archive)

notifySuccess()
} catch (t: Throwable) {
//log exception
if (retryCount == MAX_RETRIES - 1) {
notifyFaillure()
}
} finally {
deleteTempDir()
}
}
}
}


private suspend fun uploadFileToServer(archive: File) = suspendCoroutine<Int> { cont ->
HttpManager.uploadFiles(archive,
onDone = { code: Int, body: ByteArray ->
if (code / 100 == 2) {
cont.resume(code)
} else {
cont.resumeWithException(Throwable())
}

},
onFailure = {
cont.resumeWithException(Throwable())
}
)
}

private suspend fun processAndMergeFilesOfTypeA(): File = TODO()

private suspend fun processAndMergeFilesOfTypeB(): File = TODO()

private suspend fun compressMergedFiles(files: List<File>): File = TODO()

private fun deleteTempDir(): Unit = TODO()

private fun notifySuccess() {
MainScope().launch { listeners.forEach { it.onFilesUploaded() } }
}

private fun notifyFaillure() {
MainScope().launch { listeners.forEach { it.onFilesUploadFailed() } }
}

}

RxJava 实现

https://gist.github.com/kakai248/d3ac349cf2aa54da7a935fc1ab23024b

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class UploadFilesUseCase(
private val schedulerProvider: SchedulerProvider,
private val httpManager: HttpManager
) {

private var operation: Completable? = null

fun uploadFiles(): Completable = synchronized(this) {
operation
?: (doUploadFiles()
.doFinally { operation = null }
.cache()
.also { operation = it })
}

private fun doUploadFiles(): Completable =
Singles
.zip(
processAndMergeFilesOfTypeA().subscribeOn(schedulerProvider.io),
processAndMergeFilesOfTypeB().subscribeOn(schedulerProvider.io)
)
.flatMap { (fileA, fileB) -> compressMergedFiles(fileA, fileB) }
.flatMap(::uploadFileToServer)
.ignoreElement()
.doOnComplete(::deleteTempDir)
.doOnError { deleteTempDir() }
.retry(MAX_RETRIES)
.observeOn(schedulerProvider.ui)

private fun uploadFileToServer(archive: File) =
httpManager.uploadFiles(archive)
.map { response ->
if (response.code / 100 != 2) {
throw OperationFailedException()
}
}

private fun processAndMergeFilesOfTypeA(): Single<File> = Single.just(File(""))

private fun processAndMergeFilesOfTypeB(): Single<File> = Single.just(File(""))

private fun compressMergedFiles(fileA: File, fileB: File): Single<File> = Single.just(File(""))

private fun deleteTempDir() {}

companion object {
private const val MAX_RETRIES = 3L
}
}

class HttpManager {
fun uploadFiles(archive: File): Single<Response> = Single.just(Response(200, byteArrayOf()))
}

class Response(
val code: Int,
val body: ByteArray
)

class OperationFailedException : Throwable()

Restore RecyclerView scroll position

https://medium.com/androiddevelopers/restore-recyclerview-scroll-position-a8fbdc9a9334

丢失的位置

我们知道当Activity/Fragment重建后,Adapter的数据会重新加载(往往是异步的),在RecyclerView进行layout之前,数据没有加载完成的话,会导致RecyclerView失去之前的状态,最直观表现是滑动位置归零。

Google在RecyclerView的1.2.0-alpha02版本提供了一个新的API,可以让Adapter阻塞layout过程直至它数据恢复完毕,用以解决RecyclerView状态不一致的问题。

恢复位置

有多种方式可以恢复之前丢失的滑动位置,最好的方法时确保在第一次layout之前,已经设置好了Adapter的状态,若要达成这种效果,数据断然不可存储在Activity/Fragment中,而是应担使用ViewModel或者独立的Repository。如果不使用这个方法,其它的手段要么过于复杂,要么容易出错(比如误用LayoutManager.onRestoreInstanceState)。

recyclerview:1.2.0-alpha02提供的解决方案是,在Adapter类中新增了一个接口,设置状态恢复策略(restoration policy),对应的枚举是StateRestorationPolicy。共有3种枚举值。

  • ALLOW —— 默认值,直接恢复RecyclerView的状态
  • PREVENT_WHEN_EMPTY —— 仅当adapter非空(adapter.getItemCount() > 0)时才去恢复RecyclerView的状态。如果你的数据是异步加载的,RecyclerView会一直等到数据加载完毕才去恢复自身状态。一个特殊场景是,如果你的Adapter包含Header或者Footer时,你应当适用下一种策略PREVENT,除非你使用了同样在1.2.0-alpha2版本新增的MergeAdapter——它会等待自己所有的adapter进入就绪状态。
  • PREVENT —— 推迟所有的状态恢复,直至你设置了ALLOW或者PREVENT_WHEN_EMPTY

有了这个接口,可以很直接地通过以下设置,达成RecyclerView恢复状态的目的:

1
adapter.restorationPolicy = PREVENT_WHEN_EMPTY

MergeAdapter: RecyclerView的另一新特性

同样在1.2.0-alpha2版本中也提出了另一个重磅功能——MergeAdapter,它类似于开源框架MultiType,可以在同一个RecyclerView中组合多个Adapter,从而显示多种样式。可以阅读这篇文章Merge adapters sequentially with MergeAdapter进一步了解。

Clean Dagger

https://proandroiddev.com/clean-dagger-f248eda5790b

关于在Android平台使用DI的一些建议。作者的总结具有借鉴意义:

选择何种框架,这属于实现细节的问题,这些决定应当尽可能晚地做出。一个良好的体系结构不应当依赖于框架的选择。一些项目被描述为“Dagger驱动的架构”,这实际上是错误的。

DI是你应用组件之间的胶水,而非骨架。

Creating the Twitter splash screen in the simplest way possible

https://proandroiddev.com/android-motionlayout-twitter-splash-screen-b5755ed56ee8

仿照Twitter实现的开屏扩张动画。

Twitter Splash
Twitter Splash

视频:How To Stay Up To Date As A Mobile Developer?

https://www.youtube.com/watch?v=BvOn4fAIS34

主要是宏观方面给Android开发者提出的一些建议:

  1. 没有必要掌握Android开发的方方面面,而且也不现实
  2. 在掌握APP基本开发知识的基础上,找一个自己感兴趣的方向进行深入学习
  3. 接上一条,最好是可以在工作中应用该技术,但是应当注意不要引入尚测试阶段不稳定的特性
  4. 初级工程师需要掌握页面、网络请求、构建等基础知识,资深工程师应当在框架方面有相当程度的见解