跳到主要内容

协程的通信

信息

我们都知道线程之间的通信有很多种方式,一个经典的问题就是生产者与消费者的问题,那么协程之间的通信有哪些方式呢?本文将会介绍协程之间的通信方式。

1.1 协程间通信

Channel 用于协程间的通信。Channel 本质上是一个并发安全的队列,类似BlockingQueue,在使用时,通过调用同一个 Channel 对象的 send 和 receive 方法实现通信,下面看一个基本的案例

Channel is a non-blocking primitive for communication between a sender (via SendChannel) and a receiver (via ReceiveChannel). Conceptually, a channel is similar to Java's java.util.concurrent.BlockingQueue, but it has suspending operations instead of blocking ones and can be closed.

package channel

import javafx.application.Application.launch
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch

/**
* @description:
* @author: shu
* @createDate: 2023/8/18 8:59
* @version: 1.0
*/
// 总结:Channel是协程之间通信的一种方式,类似于阻塞队列,可以实现多个协程之间的数据传输
// Channel是线程安全的,可以在多个协程之间共享,可以实现多个协程之间的数据传输
// send()方法是挂起函数,只能在协程中调用,否则会报错
// receive()方法是挂起函数,只能在协程中调用,否则会报错

// 测试
suspend fun main() {
val channel = Channel<Int>()
// 启动一个协程
GlobalScope.launch {
// 发送数据
channel.send(1)
channel.send(2)
println("send done")
}
// 接收数据
println(channel.receive())
println(channel.receive())
println("receive done")

}
  • 关闭 channel 可以通过调用 close 方法来关闭,关闭后,不能再发送数据,但是可以继续接收数据,如果没有数据了,那么接收数据的方法会抛出异常,如果不想抛出异常,可以使用receiveOrNull方法,该方法会返回 null,如果 channel 已经关闭,那么该方法会返回 null,如果 channel 没有关闭,那么该方法会返回数据,如果 channel 中没有数据,那么该方法会挂起,直到有数据或者 channel 关闭。
/**
* @description:
* @author: shu
* @createDate: 2023/8/18 8:59
* @version: 1.0
*/
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // 我们结束发送
}
// 这里我们使用 `for` 循环来打印所有被接收到的元素(直到通道被关闭)
for (y in channel) println(y)
println("Done!")
  • 迭代 channel 可以通过 for 循环来迭代,当 channel 关闭后,for 循环会结束,如果 channel 没有关闭,那么 for 循环会一直阻塞,直到 channel 关闭或者有数据。
package channel

import javafx.application.Application.launch
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch

/**
* @description:
* @author: shu
* @createDate: 2023/8/18 8:59
* @version: 1.0
*/
// 总结:Channel是协程之间通信的一种方式,类似于阻塞队列,可以实现多个协程之间的数据传输
// Channel是线程安全的,可以在多个协程之间共享,可以实现多个协程之间的数据传输
// send()方法是挂起函数,只能在协程中调用,否则会报错
// receive()方法是挂起函数,只能在协程中调用,否则会报错

// 测试
suspend fun main() {
val channel = Channel<Int>()
// 启动一个协程
GlobalScope.launch {
// 发送数据
channel.send(1)
channel.send(2)
channel.close()
println("send done")
}
// 接收数据
// println(channel.receive())
// println(channel.receive())
// 迭代
for (i in channel) {
println(i)
}
println("receive done")
}
  • 生产者与消费者案例
package channel

/**
* @description:
* @author: shu
* @createDate: 2023/8/18 9:12
* @version: 1.0
*/
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
val channel = Channel<Int>()

// 生产者
val producer = launch {
for (i in 1..5) {
println("Producing $i")
channel.send(i) // 发送数据到通道
delay(1000) // 模拟生产耗时
}
channel.close() // 关闭通道
}

// 消费者
val consumer = launch {
for (value in channel) {
println("Consuming $value")
delay(1500) // 模拟消费耗时
}
}

producer.join() // 等待生产者完成
consumer.join() // 等待消费者完成
}

在这个例子中,我们创建了一个Channel来作为生产者和消费者之间的通信介质。生产者使用send函数向通道发送数据,消费者使用for循环从通道中接收数据。需要注意的是,当生产者完成数据生产后,我们调用了channel.close()来关闭通道,这将通知消费者没有更多的数据可以接收了。

1.2 通道的容量

Channel 方法不是 Channel 的构造方法,而是一个工厂方法,

public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> {}

在创建 Channel 时可以指定容量: RENDEZVOUS:创建一个容量为 0 的 Channel,类似于 SynchronousQueue。send 之后会挂起,直到被 receive。枚举值为 0。 UNLIMITED:创建一个容量无限的 Channel,内部通过链表实现。枚举值为 Int.MAX_VALUE。 CONFLATED:创建一个容量为 1 的 Channel,当后一个的数据会覆盖前一个数据。枚举值为-1。 BUFFERED:创建一个默认容量的 Channel,默认容量为 kotlinx.coroutines.channels.defaultBuffer 配置变量指定的值,未配置情况下,默认为 64。枚举值为-2。 如果 capacity 的值不为上述的枚举值,则创建一个指定容量的 Channel。

Channel 是 Kotlin 协程库中用于实现协程间通信的一种机制,类似于管道。Channel 允许在不同协程之间传递数据,这些协程可以在同一个线程中运行,也可以在不同的线程中运行。

Channel 有一个容量(capacity)的概念,它指定了 Channel 中可以存放的元素数量。容量可以控制协程间通信的行为。在创建 Channel 时,可以指定容量大小。容量可以为无限大,也可以是一个正整数。

容量的大小影响了 Channel 的阻塞行为:

  1. 如果容量为 0,即为无缓冲的 Channel(也称为 Synchronous Channel),发送方在发送数据后会被阻塞,直到接收方准备好接收数据。

  2. 如果容量大于 0,即为有缓冲的 Channel,发送方可以发送数据到 Channel 中,只有当 Channel 已满时发送方才会被阻塞。

  3. 如果容量为无限大,即 Channel 没有容量限制,发送方不会被阻塞,除非接收方准备好接收数据。

在创建 Channel 时,可以使用 Channel() 构造函数并传递容量参数来指定容量大小。例如:

val channel = Channel<Int>(capacity = 10) // 创建一个容量为10的有缓冲的 Channel

需要根据你的实际需求选择合适的容量大小。如果你希望在发送和接收之间有一定的异步缓冲,可以选择适当的容量,但要注意避免资源耗尽问题。

1.3 produce 与 actor

produceactor 都是 Kotlin 协程库中用于协程间通信的机制,但它们在功能和使用方式上有一些不同。

  1. produce:

produce 是一个函数,用于创建一个生产者通道(producer channel)。生产者通道允许一个协程生成(produce)数据并将数据发送到通道中,然后另一个协程可以异步地从通道中接收数据。生产者通道使用协程来生成数据,是一种适合生产者-消费者模式的通信机制。

示例:

val channel = produce<Int> {
for (i in 1..5) {
send(i) // 将数据发送到通道
}
}
  1. actor:

actor 是一个函数,用于创建一个有状态的协程。通过 actor 创建的协程具有一个可变的状态,并且可以接收和处理消息。每个 actor 都在单独的协程中运行,从而避免了多个协程同时访问和修改状态的并发问题。

actor 具有消息处理的能力,其他协程可以通过向 actor 发送消息来与其进行通信。actor 中的状态由 actor 协程自己管理,确保了状态的一致性。

示例:

val counterActor = actor<Int> {
var count = 0
for (msg in channel) {
count += msg
println("Current count: $count")
}
}
  1. 区别:
  • produce 创建的是一个生产者通道,用于生产者和消费者之间的数据传递。生产者可以异步地生成数据并发送到通道,而消费者可以异步地从通道中接收数据。

  • actor 创建的是一个有状态的协程,用于实现状态的封装和消息处理。actor 协程之间的状态不会互相干扰,因为每个 actor 都在自己的协程中运行。