前言

在IM项目(Android)中,聊天页面,进入会展示历史消息,而历史消息存下来的发送者信息可能并不是最新的,所以需要去刷新数据。单聊场景只需要刷新对方一个人信息,实现较为简单。但是到群聊,发送者众多,不可能每次进入页面都去获取全部成员的信息(数量大,获取缓慢),所以需要制定策略去实现好的效果。

需求分析

期望:

  1. 只去刷新显示在屏幕上的发送者信息。
  2. 每个发送者只需要刷新一次。(做个缓存)
  3. 屏幕滚动很快,中途显示的不去刷新。
  4. 如果其他地方缓存过了这个成员,就不再去获取。
  5. 群成员信息修改,及时刷新缓存数据。

方案设计

设计:

  1. 在recycler的onBindVH里收集消息列表里的发送者的ID(imAccount)。
  2. 收集到数据池(只收集不是最新数据的,防止反复收集),对imAccount去重,大小为10。利用LRU的缓存淘汰imAccount。
  3. 静置0.5秒后开始将缓存池内容发射请求。(即屏幕停止了滑动,或滑动没时新的item添加到屏幕)。
  4. 每个imAccount对应一个锁对象,保证异步下同一个imAccount只会请求一次。
  5. 结合群成员信息做缓存。(群成员缓存获取过了,如进过群成员页等, 就不再去请求,直接使用缓存里的数据)
  6. 刷新成功一个imAccount则会把整个列表里同一个发送者的信息都刷新掉。
  7. 数据刷新成功,回调刷新UI列表。需要绑定聊天页面生命周期。
  8. 收到群成员信息修改通知消息,修改缓存数据。

流程图:
Sander流程图.jpg

代码实现

该部分功能需要结合成员缓存功能。请看:IM项目中群成员获取与缓存策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
class SenderHelper private constructor() : DefaultLifecycleObserver {

companion object {
private const val CACHE_MAX_SIZE = 10
private const val COUNT_DOWN_DELAY = 500L
// 保证一对一的关系。
private val map = WeakHashMap<LifecycleOwner, SenderHelper>()

fun with(owner: LifecycleOwner, observer: Observer<List<String>>): SenderHelper {
return map[owner] ?: SenderHelper().apply {
map[owner] = this
with(owner, observer)
}
}

fun get(owner: LifecycleOwner): SenderHelper? {
return map[owner]
}

fun get(sessionId: String): SenderHelper? {
return map.values.find { it.sessionId == sessionId }
}
}

// 回调的 liveData。
private val liveData = MutableLiveData<List<String>>()
// rx。
private var compositeDisposable = CompositeDisposable()
// 入参缓存池。
private val cache = LruCache<String, Unit>(CACHE_MAX_SIZE)
// 结果列表。
private val resultList = CopyOnWriteArrayList<String>()
// 锁对象 map。
private val lockMap = ConcurrentHashMap<String, Lock>()
// data。
private var groupCode: String = ""
private var sessionId: String = ""
private lateinit var dataList: (Unit) -> List<SenderModel>
private val memberSet by lazy {
MemberHelper.getIfAbsent(groupCode)
}

private val handler = Handler()
private val runnable = Runnable {
cache.snapshot().keys.apply {
forEach { k -> cache.remove(k) }
task(this.toList())
}
}

/**
* 初始化。
*/
fun init(sessionId: String, groupCode: String, dataList: (Unit) -> List<SenderModel>) {
this.sessionId = sessionId
this.groupCode = groupCode
this.dataList = dataList
}

/**
* 获取最新数据。
*/
fun bind(sender: SenderModel) {
// 如果是自己,直接返回。
if (sender.isSelf || sender.imAccount.isEmpty()) return
// 如果最新,直接返回。
memberSet.get(sender.imAccount)?.let {
if (compare(sender, it).falseRun { changeListAndPost(it) }) return
}
// 存入缓存池。
cache.get(sender.imAccount) ?: cache.put(sender.imAccount, Unit)
countDown()
}

/**
* 主动刷新名称。
*/
fun updateNickname(imAccount: String, nickname: String) {
memberSet.get(imAccount)?.let {
it.nickName = nickname
changeListAndPost(it)
}
}

/**
* 主动刷新身份。
*/
fun updateGroupRole(imAccount: String, groupRole: Int) {
memberSet.get(imAccount)?.let {
it.groupRole = groupRole
changeListAndPost(it)
}
}

override fun onDestroy(owner: LifecycleOwner) {
compositeDisposable.clear()
handler.removeCallbacksAndMessages(null)
map.remove(owner)
}

//---------private method-----------//

/**
* 绑定生命周期和观察。
*/
private fun with(owner: LifecycleOwner, observer: Observer<List<String>>) {
owner.lifecycle.addObserver(this)
liveData.observe(owner, observer)
}

/**
* 延时计时。
*/
private fun countDown() {
handler.removeCallbacksAndMessages(null)
handler.postDelayed(runnable, COUNT_DOWN_DELAY)
}

/**
* 任务。
*/
private fun task(imAccountList: List<String>) {
Observable
.fromIterable(imAccountList)
.flatMap { work(it) }
.doFinally {
if (resultList.isNotEmpty()) {
liveData.postValue(ArrayList(resultList))
resultList.clear()
}
}
.subscribe({}, {})
.addToComposite()
}

/**
* 工作。各自开辟子线程。
*/
private fun work(imAccount: String): Observable<*> {
return Observable.just(imAccount)
.subscribeOn(Schedulers.io())
.flatMap {
synchronized(getLock(it).lock) {
if (memberSet.get(it) == null) {
netWork(it)
} else {
Observable.just(it)
}
}
}
}

/**
* 网络操作。与工作同一个线程。
*/
private fun netWork(imAccount: String): Observable<*> {
return MemberHelper
.loadMember(sessionId, imAccount)
.filter { it.status && it.entry != null }
.map { it.entry!! }
.doOnNext {
resultList.add(it.imAccount.orEmpty())
memberSet.put(it)
updateDb(it)
changeList(it)
}
}

/**
*
* 更新数据库数据。
*/
private fun updateDb(bean: MemberBean) {
...修改数据库实现不重要...
}

/**
* 刷洗数据及发送数据变化信号。
*/
private fun changeListAndPost(bean: MemberBean) {
changeList(bean).trueRun { liveData.postValue(arrayListOf(bean.imAccount.orEmpty())) }
}

/**
* 刷新列表数据。
*/
private fun changeList(bean: MemberBean): Boolean {
val isChange: Boolean
dataList()
.filter { it.imAccount == bean.imAccount && compare(it, bean).not() }
.apply { isChange = this.isNotEmpty() }
.forEach {
it.nickName = bean.nickName.orEmpty()
it.avatar = bean.avatar?.toLoadUrl().orEmpty()
it.setGroupRole(bean.groupRole)
}
return isChange
}

/**
* 比较是否最新了。
*/
private fun compare(sender: SenderModel, bean: MemberBean): Boolean {
return (bean.groupRole == sender.groupRole
&& bean.nickName == sender.nickName
&& bean.avatar?.toLoadUrl() == sender.avatar)
}

/**
* 锁。
*/
class Lock(val lock: Any = Any())

/**
* 获取锁对象。
*/
private fun getLock(imAccount: String): Lock {
return lockMap[imAccount] ?: Lock().apply { lockMap[imAccount] = this }
}

/**
* add 到复合体。
*/
private fun Disposable.addToComposite() {
compositeDisposable.add(this)
}

}

使用:

初始化:

1
2
3
SenderHelper
.with(lifecyclerOwner, Observer { updateList() })
.init(sessionId, groupCode) { getSenderList() }

在recyclerView适配器的onBindVH处:

1
SenderHelper.get(lifecyclerOwner)?.bind(sender)

收到消息主动刷新缓存:

1
2
3
4
// 更新名称。
SenderHelper.get(sessionId)?.updateNickname(imAccount,nickName)
// 更新身份。
SenderHelper.get(sessionId)?.updateGroupRole(imAccount,groupRole)

总结

要点:

  1. 收集最新进入的 imAccount,最多10个。
  2. 静置 0.5 秒,将收集的数据分别请求。
  3. 同一个 imAccount 只能请求一次。
  4. 绑定生命周期,一对一关系。
  5. 与群成员缓存结合。

PS:从这个方案中,可以扩展到列表内容局部数据请求接口刷新的场景。
配一张图