Commit 680dc0f0 authored by Him188's avatar Him188

Fix memory leak on failure on image uploading (Fix #385, fix #387)

parent dfddfae8
...@@ -64,26 +64,22 @@ internal class ChunkedInput( ...@@ -64,26 +64,22 @@ internal class ChunkedInput(
* *
* 若 [ByteReadPacket.remaining] 小于 [sizePerPacket], 将会返回唯一元素 [this] 的 [Sequence] * 若 [ByteReadPacket.remaining] 小于 [sizePerPacket], 将会返回唯一元素 [this] 的 [Sequence]
*/ */
internal fun ByteReadPacket.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> { internal fun ByteReadPacket.chunkedFlow(sizePerPacket: Int, buffer: ByteArray): Flow<ChunkedInput> {
ByteArrayPool.checkBufferSize(sizePerPacket) ByteArrayPool.checkBufferSize(sizePerPacket)
if (this.remaining <= sizePerPacket.toLong()) { if (this.remaining <= sizePerPacket.toLong()) {
ByteArrayPool.useInstance { buffer -> return flowOf(
return flowOf( ChunkedInput(
ChunkedInput( buffer,
buffer, this.readAvailable(buffer, 0, sizePerPacket)
this.readAvailable(buffer, 0, sizePerPacket)
)
) )
} )
} }
return flow { return flow {
ByteArrayPool.useInstance { buffer -> val chunkedInput = ChunkedInput(buffer, 0)
val chunkedInput = ChunkedInput(buffer, 0) do {
do { chunkedInput.size = this@chunkedFlow.readAvailable(buffer, 0, sizePerPacket)
chunkedInput.size = this@chunkedFlow.readAvailable(buffer, 0, sizePerPacket) emit(chunkedInput)
emit(chunkedInput) } while (this@chunkedFlow.isNotEmpty)
} while (this@chunkedFlow.isNotEmpty)
}
} }
} }
...@@ -93,19 +89,17 @@ internal fun ByteReadPacket.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> ...@@ -93,19 +89,17 @@ internal fun ByteReadPacket.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput>
* 对于一个 1000 长度的 [ByteReadChannel] 和参数 [sizePerPacket] = 300, 将会产生含四个元素的 [Sequence], * 对于一个 1000 长度的 [ByteReadChannel] 和参数 [sizePerPacket] = 300, 将会产生含四个元素的 [Sequence],
* 其长度分别为: 300, 300, 300, 100. * 其长度分别为: 300, 300, 300, 100.
*/ */
internal fun ByteReadChannel.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> { internal fun ByteReadChannel.chunkedFlow(sizePerPacket: Int, buffer: ByteArray): Flow<ChunkedInput> {
ByteArrayPool.checkBufferSize(sizePerPacket) ByteArrayPool.checkBufferSize(sizePerPacket)
if (this.isClosedForRead) { if (this.isClosedForRead) {
return flowOf() return flowOf()
} }
return flow { return flow {
ByteArrayPool.useInstance { buffer -> val chunkedInput = ChunkedInput(buffer, 0)
val chunkedInput = ChunkedInput(buffer, 0) do {
do { chunkedInput.size = this@chunkedFlow.readAvailable(buffer, 0, sizePerPacket)
chunkedInput.size = this@chunkedFlow.readAvailable(buffer, 0, sizePerPacket) emit(chunkedInput)
emit(chunkedInput) } while (!this@chunkedFlow.isClosedForRead)
} while (!this@chunkedFlow.isClosedForRead)
}
} }
} }
...@@ -117,7 +111,7 @@ internal fun ByteReadChannel.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> ...@@ -117,7 +111,7 @@ internal fun ByteReadChannel.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput>
* 其长度分别为: 300, 300, 300, 100. * 其长度分别为: 300, 300, 300, 100.
*/ */
@OptIn(ExperimentalCoroutinesApi::class) @OptIn(ExperimentalCoroutinesApi::class)
internal fun Input.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> { internal fun Input.chunkedFlow(sizePerPacket: Int, buffer: ByteArray): Flow<ChunkedInput> {
ByteArrayPool.checkBufferSize(sizePerPacket) ByteArrayPool.checkBufferSize(sizePerPacket)
if (this.endOfInput) { if (this.endOfInput) {
...@@ -125,12 +119,10 @@ internal fun Input.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> { ...@@ -125,12 +119,10 @@ internal fun Input.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> {
} }
return flow { return flow {
ByteArrayPool.useInstance { buffer -> val chunkedInput = ChunkedInput(buffer, 0)
val chunkedInput = ChunkedInput(buffer, 0) while (!this@chunkedFlow.endOfInput) {
while (!this@chunkedFlow.endOfInput) { chunkedInput.size = this@chunkedFlow.readAvailable(buffer, 0, sizePerPacket)
chunkedInput.size = this@chunkedFlow.readAvailable(buffer, 0, sizePerPacket) emit(chunkedInput)
emit(chunkedInput)
}
} }
} }
} }
...@@ -144,16 +136,14 @@ internal fun Input.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> { ...@@ -144,16 +136,14 @@ internal fun Input.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> {
* 若 [ByteReadPacket.remaining] 小于 [sizePerPacket], 将会返回唯一元素 [this] 的 [Sequence] * 若 [ByteReadPacket.remaining] 小于 [sizePerPacket], 将会返回唯一元素 [this] 的 [Sequence]
*/ */
@OptIn(ExperimentalCoroutinesApi::class, InternalSerializationApi::class) @OptIn(ExperimentalCoroutinesApi::class, InternalSerializationApi::class)
internal fun InputStream.chunkedFlow(sizePerPacket: Int): Flow<ChunkedInput> { internal fun InputStream.chunkedFlow(sizePerPacket: Int, buffer: ByteArray): Flow<ChunkedInput> {
ByteArrayPool.checkBufferSize(sizePerPacket) require(sizePerPacket <= buffer.size) { "sizePerPacket is too large. Maximum buffer size=buffer.size=${buffer.size}" }
return flow { return flow {
ByteArrayPool.useInstance { buffer -> val chunkedInput = ChunkedInput(buffer, 0)
val chunkedInput = ChunkedInput(buffer, 0) while (this@chunkedFlow.available() != 0) {
while (this@chunkedFlow.available() != 0) { chunkedInput.size = this@chunkedFlow.read(buffer, 0, sizePerPacket)
chunkedInput.size = this@chunkedFlow.read(buffer, 0, sizePerPacket) emit(chunkedInput)
emit(chunkedInput)
}
} }
} }
} }
\ No newline at end of file
...@@ -11,6 +11,8 @@ import net.mamoe.mirai.message.data.toLongUnsigned ...@@ -11,6 +11,8 @@ import net.mamoe.mirai.message.data.toLongUnsigned
import java.io.File import java.io.File
import java.io.InputStream import java.io.InputStream
internal const val DEFAULT_REUSABLE_INPUT_BUFFER_SIZE = 8192
internal actual fun ByteArray.asReusableInput(): ReusableInput { internal actual fun ByteArray.asReusableInput(): ReusableInput {
return object : ReusableInput { return object : ReusableInput {
override val md5: ByteArray = md5() override val md5: ByteArray = md5()
...@@ -18,9 +20,11 @@ internal actual fun ByteArray.asReusableInput(): ReusableInput { ...@@ -18,9 +20,11 @@ internal actual fun ByteArray.asReusableInput(): ReusableInput {
override fun chunkedFlow(sizePerPacket: Int): ChunkedFlowSession<ChunkedInput> { override fun chunkedFlow(sizePerPacket: Int): ChunkedFlowSession<ChunkedInput> {
return object : ChunkedFlowSession<ChunkedInput> { return object : ChunkedFlowSession<ChunkedInput> {
override val flow: Flow<ChunkedInput> = inputStream().chunkedFlow(sizePerPacket) private val stream = inputStream()
override val flow: Flow<ChunkedInput> = stream.chunkedFlow(sizePerPacket, ByteArray(DEFAULT_REUSABLE_INPUT_BUFFER_SIZE.coerceAtLeast(sizePerPacket)))
override fun close() { override fun close() {
stream.close()
// nothing to do // nothing to do
} }
} }
...@@ -46,7 +50,7 @@ internal fun File.asReusableInput(deleteOnClose: Boolean): ReusableInput { ...@@ -46,7 +50,7 @@ internal fun File.asReusableInput(deleteOnClose: Boolean): ReusableInput {
override fun chunkedFlow(sizePerPacket: Int): ChunkedFlowSession<ChunkedInput> { override fun chunkedFlow(sizePerPacket: Int): ChunkedFlowSession<ChunkedInput> {
val stream = inputStream() val stream = inputStream()
return object : ChunkedFlowSession<ChunkedInput> { return object : ChunkedFlowSession<ChunkedInput> {
override val flow: Flow<ChunkedInput> = stream.chunkedFlow(sizePerPacket) override val flow: Flow<ChunkedInput> = stream.chunkedFlow(sizePerPacket, ByteArray(DEFAULT_REUSABLE_INPUT_BUFFER_SIZE.coerceAtLeast(sizePerPacket)))
override fun close() { override fun close() {
stream.close() stream.close()
if (deleteOnClose) this@asReusableInput.delete() if (deleteOnClose) this@asReusableInput.delete()
...@@ -72,7 +76,7 @@ internal fun File.asReusableInput(deleteOnClose: Boolean, md5: ByteArray): Reusa ...@@ -72,7 +76,7 @@ internal fun File.asReusableInput(deleteOnClose: Boolean, md5: ByteArray): Reusa
override fun chunkedFlow(sizePerPacket: Int): ChunkedFlowSession<ChunkedInput> { override fun chunkedFlow(sizePerPacket: Int): ChunkedFlowSession<ChunkedInput> {
val stream = inputStream() val stream = inputStream()
return object : ChunkedFlowSession<ChunkedInput> { return object : ChunkedFlowSession<ChunkedInput> {
override val flow: Flow<ChunkedInput> = stream.chunkedFlow(sizePerPacket) override val flow: Flow<ChunkedInput> = stream.chunkedFlow(sizePerPacket, ByteArray(DEFAULT_REUSABLE_INPUT_BUFFER_SIZE.coerceAtLeast(sizePerPacket)))
override fun close() { override fun close() {
stream.close() stream.close()
if (deleteOnClose) this@asReusableInput.delete() if (deleteOnClose) this@asReusableInput.delete()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment