Commit 90ee9b44 authored by Him188's avatar Him188

Rework reconnection, fixes #228

parent f1ac53f1
......@@ -7,7 +7,7 @@
* https://github.com/mamoe/mirai/blob/master/LICENSE
*/
@file:Suppress("EXPERIMENTAL_API_USAGE", "DEPRECATION_ERROR")
@file:Suppress("EXPERIMENTAL_API_USAGE", "DEPRECATION_ERROR", "INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
package net.mamoe.mirai.qqandroid
......@@ -28,6 +28,7 @@ import kotlinx.serialization.json.int
import net.mamoe.mirai.Bot
import net.mamoe.mirai.BotImpl
import net.mamoe.mirai.LowLevelAPI
import net.mamoe.mirai.ThisApiMustBeUsedInWithConnectionLockBlock
import net.mamoe.mirai.contact.*
import net.mamoe.mirai.data.*
import net.mamoe.mirai.event.broadcast
......@@ -36,6 +37,7 @@ import net.mamoe.mirai.event.events.MessageRecallEvent
import net.mamoe.mirai.event.events.NewFriendRequestEvent
import net.mamoe.mirai.message.MessageReceipt
import net.mamoe.mirai.message.data.*
import net.mamoe.mirai.network.LoginFailedException
import net.mamoe.mirai.qqandroid.contact.MemberInfoImpl
import net.mamoe.mirai.qqandroid.contact.QQImpl
import net.mamoe.mirai.qqandroid.contact.checkIsGroupImpl
......@@ -203,9 +205,14 @@ internal abstract class QQAndroidBotBase constructor(
})
}
/**
* Final process for 'login'
*/
@ThisApiMustBeUsedInWithConnectionLockBlock
@Throws(LoginFailedException::class) // only
override suspend fun relogin(cause: Throwable?) {
client.useNextServers { host, port ->
network.relogin(host, port, cause)
network.closeEverythingAndRelogin(host, port, cause)
}
}
......
......@@ -37,6 +37,7 @@ import net.mamoe.mirai.qqandroid.network.protocol.packet.login.ConfigPushSvc
import net.mamoe.mirai.qqandroid.network.protocol.packet.login.Heartbeat
import net.mamoe.mirai.qqandroid.network.protocol.packet.login.StatSvc
import net.mamoe.mirai.qqandroid.network.protocol.packet.login.WtLogin
import net.mamoe.mirai.qqandroid.utils.NoRouteToHostException
import net.mamoe.mirai.qqandroid.utils.PlatformSocket
import net.mamoe.mirai.qqandroid.utils.io.readPacketExact
import net.mamoe.mirai.qqandroid.utils.io.useBytes
......@@ -64,6 +65,11 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler
private val packetReceiveLock: Mutex = Mutex()
override fun areYouOk(): Boolean {
return this.isActive && ::channel.isInitialized && channel.isOpen
&& heartbeatJob?.isActive == true && _packetReceiverJob?.isActive == true
}
private suspend fun startPacketReceiverJobOrKill(cancelCause: CancellationException? = null): Job {
_packetReceiverJob?.cancel(cancelCause)
_packetReceiverJob?.join()
......@@ -104,7 +110,7 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler
}
@OptIn(MiraiExperimentalAPI::class)
override suspend fun relogin(host: String, port: Int, cause: Throwable?) {
override suspend fun closeEverythingAndRelogin(host: String, port: Int, cause: Throwable?) {
heartbeatJob?.cancel(CancellationException("relogin", cause))
heartbeatJob?.join()
if (::channel.isInitialized) {
......@@ -119,10 +125,16 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler
channel = PlatformSocket()
// TODO: 2020/2/14 连接多个服务器, #52
withTimeoutOrNull(3000) {
channel.connect(host, port)
} ?: error("timeout connecting server")
logger.info("Connected to server $host:$port")
while (isActive) {
try {
channel.connect(host, port)
break
} catch (e: NoRouteToHostException) {
logger.warning { "No route to host (Mostly due to no Internet connection). Retrying in 3s..." }
delay(3000)
}
}
logger.info { "Connected to server $host:$port" }
startPacketReceiverJobOrKill(CancellationException("relogin", cause))
var response: WtLogin.Login.LoginPacketResponse = WtLogin.Login.SubCommand9(bot.client).sendAndExpect()
......@@ -348,7 +360,7 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler
Unit // dont remove. can help type inference
}
suspend fun doHeartBeat(): Exception? {
private suspend fun doHeartBeat(): Exception? {
val lastException: Exception?
try {
kotlin.runCatching {
......@@ -485,7 +497,7 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler
* 处理从服务器接收过来的包. 这些包可能是粘在一起的, 也可能是不完整的. 将会自动处理.
* 处理后的包会调用 [parsePacketAsync]
*/
internal fun processPacket(rawInput: ByteReadPacket) {
private fun processPacket(rawInput: ByteReadPacket) {
if (rawInput.remaining == 0L) {
return
}
......@@ -563,6 +575,7 @@ internal class QQAndroidBotNetworkHandler(bot: QQAndroidBot) : BotNetworkHandler
/**
* 发送一个包, 但不期待任何返回.
* 不推荐使用它, 可能产生意外的情况.
*/
suspend fun OutgoingPacket.sendWithoutExpect() {
check(bot.isActive) { "bot is dead therefore can't send any packet" }
......
......@@ -15,6 +15,7 @@ import kotlinx.atomicfu.AtomicInt
import kotlinx.atomicfu.atomic
import kotlinx.io.core.*
import net.mamoe.mirai.data.OnlineStatus
import net.mamoe.mirai.network.LoginFailedException
import net.mamoe.mirai.network.NoServerAvailableException
import net.mamoe.mirai.qqandroid.BotAccount
import net.mamoe.mirai.qqandroid.QQAndroidBot
......@@ -125,8 +126,7 @@ internal open class QQAndroidClient(
lateinit var fileStoragePushFSSvcList: FileStoragePushFSSvcListFuckKotlin
internal suspend inline fun useNextServers(crossinline block: suspend (host: String, port: Int) -> Unit) {
@Suppress("UNREACHABLE_CODE", "ThrowableNotThrown") // false positive
retryCatching(bot.client.serverList.size) {
retryCatching(bot.client.serverList.size, except = LoginFailedException::class) {
val pair = bot.client.serverList.random()
kotlin.runCatching {
block(pair.first, pair.second)
......
......@@ -18,8 +18,10 @@ import io.ktor.http.content.OutgoingContent
import io.ktor.http.userAgent
import io.ktor.utils.io.ByteWriteChannel
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.io.ByteReadChannel
import kotlinx.coroutines.isActive
import kotlinx.io.InputStream
import kotlinx.io.core.Input
import kotlinx.io.core.discardExact
......@@ -29,6 +31,7 @@ import kotlinx.serialization.InternalSerializationApi
import net.mamoe.mirai.qqandroid.network.QQAndroidClient
import net.mamoe.mirai.qqandroid.network.protocol.data.proto.CSDataHighwayHead
import net.mamoe.mirai.qqandroid.utils.ByteArrayPool
import net.mamoe.mirai.qqandroid.utils.NoRouteToHostException
import net.mamoe.mirai.qqandroid.utils.PlatformSocket
import net.mamoe.mirai.qqandroid.utils.io.serialization.readProtoBuf
import net.mamoe.mirai.qqandroid.utils.io.withUse
......@@ -112,7 +115,14 @@ internal object HighwayHelper {
// require(commandId == 2 || commandId == 1) { "bad commandId. Must be 1 or 2" }
val socket = PlatformSocket()
socket.connect(serverIp, serverPort)
while (client.bot.network.isActive) {
try {
socket.connect(serverIp, serverPort)
break
} catch (e: NoRouteToHostException) {
delay(3000)
}
}
socket.use {
createImageDataPacketSequence(
client = client,
......
......@@ -11,12 +11,14 @@ package net.mamoe.mirai.qqandroid.utils
import kotlinx.io.core.ByteReadPacket
import kotlinx.io.core.Closeable
import net.mamoe.mirai.utils.MiraiInternalAPI
import kotlinx.io.errors.IOException
import net.mamoe.mirai.utils.Throws
/**
* 多平台适配的 TCP Socket.
*/
internal expect class PlatformSocket() : Closeable {
@Throws(NoRouteToHostException::class)
suspend fun connect(serverHost: String, serverPort: Int)
/**
......@@ -37,4 +39,7 @@ internal expect class PlatformSocket() : Closeable {
val isOpen: Boolean
override fun close()
}
\ No newline at end of file
}
expect open class SocketException : IOException
expect class NoRouteToHostException : SocketException
\ No newline at end of file
......@@ -13,29 +13,26 @@
package net.mamoe.mirai.qqandroid.utils
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlin.jvm.JvmMultifileClass
import kotlin.jvm.JvmName
import kotlin.reflect.KClass
@PublishedApi
internal expect fun Throwable.addSuppressedMirai(e: Throwable)
@OptIn(ExperimentalContracts::class)
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "RESULT_CLASS_IN_RETURN_TYPE")
@kotlin.internal.InlineOnly
internal inline fun <R> retryCatching(n: Int, block: () -> R): Result<R> {
contract {
callsInPlace(block, InvocationKind.AT_LEAST_ONCE)
}
internal inline fun <R> retryCatching(n: Int, except: KClass<out Throwable>? = null, block: () -> R): Result<R> {
require(n >= 0) { "param n for retryCatching must not be negative" }
var exception: Throwable? = null
repeat(n) {
try {
return Result.success(block())
} catch (e: Throwable) {
if (except?.isInstance(e) == true) {
return Result.failure(e)
}
exception?.addSuppressedMirai(e)
exception = e
}
......
......@@ -14,13 +14,13 @@ import kotlinx.coroutines.withContext
import kotlinx.io.core.ByteReadPacket
import kotlinx.io.core.Closeable
import kotlinx.io.core.ExperimentalIoApi
import kotlinx.io.errors.IOException
import kotlinx.io.streams.readPacketAtMost
import kotlinx.io.streams.writePacket
import net.mamoe.mirai.utils.MiraiInternalAPI
import java.io.BufferedInputStream
import java.io.BufferedOutputStream
import java.io.IOException
import java.net.Socket
import java.net.SocketException
/**
* 多平台适配的 TCP Socket.
......@@ -42,6 +42,7 @@ internal actual class PlatformSocket : Closeable {
@PublishedApi
internal lateinit var writeChannel: BufferedOutputStream
@PublishedApi
internal lateinit var readChannel: BufferedInputStream
......@@ -87,4 +88,8 @@ internal actual class PlatformSocket : Closeable {
writeChannel = socket.getOutputStream().buffered()
}
}
}
\ No newline at end of file
}
actual typealias NoRouteToHostException = java.net.NoRouteToHostException
actual typealias SocketException = SocketException
\ No newline at end of file
......@@ -22,7 +22,7 @@ import net.mamoe.mirai.network.ForceOfflineException
import net.mamoe.mirai.network.LoginFailedException
import net.mamoe.mirai.network.closeAndJoin
import net.mamoe.mirai.utils.*
import net.mamoe.mirai.utils.internal.tryNTimesOrException
import net.mamoe.mirai.utils.internal.retryCatching
import kotlin.coroutines.CoroutineContext
/*
......@@ -83,102 +83,135 @@ abstract class BotImpl<N : BotNetworkHandler> constructor(
@Suppress("PropertyName")
internal lateinit var _network: N
/**
* Close server connection, resend login packet, BUT DOESN'T [BotNetworkHandler.init]
*/
@ThisApiMustBeUsedInWithConnectionLockBlock
@Throws(LoginFailedException::class) // only
protected abstract suspend fun relogin(cause: Throwable?)
@Suppress("unused")
private val offlineListener: Listener<BotOfflineEvent> = this.subscribeAlways { event ->
when (event) {
is BotOfflineEvent.Dropped,
is BotOfflineEvent.RequireReconnect
-> {
if (!_network.isActive) {
return@subscribeAlways
private val offlineListener: Listener<BotOfflineEvent> =
this@BotImpl.subscribeAlways(concurrency = Listener.ConcurrencyKind.LOCKED) { event ->
if (network.areYouOk()) {
// avoid concurrent re-login tasks
return@subscribeAlways
}
when (event) {
is BotOfflineEvent.Dropped,
is BotOfflineEvent.RequireReconnect
-> {
if (!_network.isActive) {
// normally closed
return@subscribeAlways
}
bot.logger.info { "Connection dropped by server or lost, retrying login" }
retryCatching(configuration.reconnectionRetryTimes,
except = LoginFailedException::class) { tryCount, _ ->
if (tryCount != 0) {
delay(configuration.reconnectPeriodMillis)
}
network.withConnectionLock {
/**
* [BotImpl.relogin] only, no [BotNetworkHandler.init]
*/
@OptIn(ThisApiMustBeUsedInWithConnectionLockBlock::class)
relogin((event as? BotOfflineEvent.Dropped)?.cause)
}
logger.info { "Reconnected successfully" }
BotReloginEvent(bot, (event as? BotOfflineEvent.Dropped)?.cause).broadcast()
return@subscribeAlways
}.getOrElse {
logger.info { "Cannot reconnect" }
throw it
}
}
bot.logger.info("Connection dropped by server or lost, retrying login")
tryNTimesOrException(configuration.reconnectionRetryTimes) { tryCount ->
if (tryCount != 0) {
delay(configuration.reconnectPeriodMillis)
is BotOfflineEvent.Active -> {
val msg = if (event.cause == null) {
""
} else {
" with exception: " + event.cause.message
}
relogin((event as? BotOfflineEvent.Dropped)?.cause)
logger.info("Reconnected successfully")
BotReloginEvent(bot, (event as? BotOfflineEvent.Dropped)?.cause).broadcast()
return@subscribeAlways
}?.let {
logger.info("Cannot reconnect")
throw it
bot.logger.info { "Bot is closed manually$msg" }
closeAndJoin(CancellationException(event.toString()))
}
}
is BotOfflineEvent.Active -> {
val msg = if (event.cause == null) {
""
} else {
" with exception: " + event.cause.message
is BotOfflineEvent.Force -> {
bot.logger.info { "Connection occupied by another android device: ${event.message}" }
closeAndJoin(ForceOfflineException(event.toString()))
}
bot.logger.info { "Bot is closed manually$msg" }
closeAndJoin(CancellationException(event.toString()))
}
is BotOfflineEvent.Force -> {
bot.logger.info { "Connection occupied by another android device: ${event.message}" }
closeAndJoin(ForceOfflineException(event.toString()))
}
}
}
/**
* **Exposed public API**
* [BotImpl.relogin] && [BotNetworkHandler.init]
*/
final override suspend fun login() {
logger.info("Logging in...")
reinitializeNetworkHandler(null)
logger.info("Login successful")
}
private suspend fun reinitializeNetworkHandler(
cause: Throwable?
) {
suspend fun doRelogin() {
while (true) {
_network = createNetworkHandler(this.coroutineContext)
try {
relogin(null)
return
} catch (e: LoginFailedException) {
throw e
} catch (e: Exception) {
network.logger.error(e)
_network.closeAndJoin(e)
@ThisApiMustBeUsedInWithConnectionLockBlock
suspend fun reinitializeNetworkHandler(cause: Throwable?) {
suspend fun doRelogin() {
while (true) {
_network = createNetworkHandler(this.coroutineContext)
try {
@OptIn(ThisApiMustBeUsedInWithConnectionLockBlock::class)
relogin(null)
return
} catch (e: LoginFailedException) {
throw e
} catch (e: Exception) {
network.logger.error(e)
_network.closeAndJoin(e)
}
logger.warning("Login failed. Retrying in 3s...")
delay(3000)
}
logger.warning("Login failed. Retrying in 3s...")
delay(3000)
}
}
suspend fun doInit() {
tryNTimesOrException(2, onRetry = {
if (!isActive) {
logger.error("Cannot init due to fatal error")
logger.error(it)
}
}) {
if (it != 0) {
logger.warning("Init failed. Retrying in 3s...")
delay(3000)
suspend fun doInit() {
retryCatching(2) { count, lastException ->
if (count != 0) {
if (!isActive) {
logger.error("Cannot init due to fatal error")
if (lastException == null) {
logger.error("<no exception>")
} else {
logger.error(lastException)
}
}
logger.warning("Init failed. Retrying in 3s...")
delay(3000)
}
_network.init()
}.getOrElse {
network.logger.error(it)
logger.error("Cannot init. some features may be affected")
}
_network.init()
}?.let {
network.logger.error(it)
logger.error("Cannot init. some features may be affected")
}
}
// logger.info("Initializing BotNetworkHandler")
// logger.info("Initializing BotNetworkHandler")
if (::_network.isInitialized) {
BotReloginEvent(this, cause).broadcast()
doRelogin()
return
}
if (::_network.isInitialized) {
BotReloginEvent(this, cause).broadcast()
doRelogin()
return
doInit()
}
doRelogin()
doInit()
logger.info("Logging in...")
if (::_network.isInitialized) {
network.withConnectionLock {
@OptIn(ThisApiMustBeUsedInWithConnectionLockBlock::class)
reinitializeNetworkHandler(null)
}
} else {
@OptIn(ThisApiMustBeUsedInWithConnectionLockBlock::class)
reinitializeNetworkHandler(null)
}
logger.info("Login successful")
}
protected abstract fun createNetworkHandler(coroutineContext: CoroutineContext): N
......@@ -203,6 +236,9 @@ abstract class BotImpl<N : BotNetworkHandler> constructor(
// already cancelled
return
}
this.launch {
BotOfflineEvent.Active(this@BotImpl, cause).broadcast()
}
if (cause == null) {
this.cancel()
} else {
......@@ -210,3 +246,7 @@ abstract class BotImpl<N : BotNetworkHandler> constructor(
}
}
}
@RequiresOptIn(level = RequiresOptIn.Level.ERROR)
internal annotation class ThisApiMustBeUsedInWithConnectionLockBlock
\ No newline at end of file
......@@ -15,9 +15,12 @@ import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import net.mamoe.mirai.Bot
import net.mamoe.mirai.utils.MiraiInternalAPI
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.WeakRefProperty
/**
* Mirai 的网络处理器, 它承担所有数据包([Packet])的处理任务.
......@@ -34,12 +37,23 @@ import net.mamoe.mirai.utils.MiraiLogger
* - 所有数据包处理和发送
*
* [BotNetworkHandler.close] 时将会 [取消][Job.cancel] 所有此作用域下的协程
*
* @suppress 此为**内部 API**, 可能在任意时刻被改动, 且不会给出任何警告.
*/
@Suppress("PropertyName")
abstract class BotNetworkHandler : CoroutineScope {
/*
此为**内部 API**, 可能在任意时刻被改动, 且不会给出任何警告.
此为**内部 API**, 可能在任意时刻被改动, 且不会给出任何警告.
此为**内部 API**, 可能在任意时刻被改动, 且不会给出任何警告.
*/
/**
* 所属 [Bot]. 为弱引用
*/
@WeakRefProperty
abstract val bot: Bot
/**
......@@ -67,7 +81,7 @@ abstract class BotNetworkHandler : CoroutineScope {
*/
@Suppress("SpellCheckingInspection")
@MiraiInternalAPI
abstract suspend fun relogin(host: String, port: Int, cause: Throwable? = null)
abstract suspend fun closeEverythingAndRelogin(host: String, port: Int, cause: Throwable? = null)
/**
* 初始化获取好友列表等值.
......@@ -84,6 +98,14 @@ abstract class BotNetworkHandler : CoroutineScope {
*/
abstract suspend fun join()
abstract fun areYouOk(): Boolean
private val connectionLock: Mutex = Mutex()
internal suspend inline fun withConnectionLock(block: BotNetworkHandler.() -> Unit) {
connectionLock.withLock { if (areYouOk()) return else block() }
}
/**
* 关闭网络接口, 停止所有有关协程和任务
*
......
......@@ -9,31 +9,34 @@
package net.mamoe.mirai.utils.internal
import net.mamoe.mirai.utils.MiraiInternalAPI
import kotlin.reflect.KClass
@PublishedApi
internal expect fun Throwable.addSuppressedMirai(e: Throwable)
@MiraiInternalAPI
@Suppress("DuplicatedCode")
internal inline fun <R> tryNTimesOrException(
repeat: Int,
onRetry: (Throwable?) -> Unit = {},
block: (Int) -> R
): Throwable? {
var lastException: Throwable? = null
repeat(repeat) {
// Currently we can't share internal code between modules.
@Suppress("DuplicatedCode", "INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "RESULT_CLASS_IN_RETURN_TYPE")
@kotlin.internal.InlineOnly
internal inline fun <R> retryCatching(
n: Int,
except: KClass<out Throwable>? = null,
block: (count: Int, lastException: Throwable?) -> R
): Result<R> {
require(n >= 0) {
"param n for retryCatching must not be negative"
}
var exception: Throwable? = null
repeat(n) {
try {
block(it)
return null
return Result.success(block(it, exception))
} catch (e: Throwable) {
if (lastException == null) {
lastException = e
} else lastException!!.addSuppressedMirai(e)
if (except?.isInstance(e) == true) {
return Result.failure(e)
}
exception?.addSuppressedMirai(e)
exception = e
}
onRetry(lastException)
}
return lastException!!
return Result.failure(exception!!)
}
\ No newline at end of file
......@@ -40,7 +40,7 @@ actual abstract class BotJavaFriendlyAPI actual constructor() {
*
* 一般情况下不需要重新登录. Mirai 能够自动处理掉线情况.
*
* 最终调用 [net.mamoe.mirai.network.BotNetworkHandler.relogin]
* 最终调用 [net.mamoe.mirai.network.BotNetworkHandler.closeEverythingAndRelogin]
*
* @throws LoginFailedException
*/
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment