跳转至

脚本开发 / 连接器订阅

部分连接器支持订阅消息,DataFlux Func 提供了统一的方式进行订阅。

在较早版本中,曾名为「数据源」,现版本已改为「连接器」

1. 前言

由于 DataFlux Func 的脚本运行机制,函数启动后最终都必须会结束,不允许函数无限运行。

因此对于订阅这类长期驻留的处理,并不支持直接在 DataFlux Func 里编写消费者长期运行来实现。

而需要通过在连接器中指定订阅主题,由 DataFlux Func 主程序统一负责订阅消息。

当 DataFlux Func 主程序接收到消息后,会将消息转发给指定的消息处理函数进行处理,以此完成订阅消息处理。

2. 支持订阅的连接器

最新版 DataFlux Func 支持如下连接器的订阅

  • Redis
  • MQTT Broker (v5.0)
  • Kafka

3. 操作步骤

以订阅 Redis 消息并处理,具体操作步骤如下:

3.1 编写消息处理函数

消息处理函数具有固定的函数形式,如:

Python
1
2
3
4
@DFF.API('Message Handler')
def message_handler(topic, message):
    print('topic', topic)     # 主题
    print('message', message) # 内容

完成脚本后,保存并发布:

3.2 创建并配置连接器

除了填写基本配置外,需要额外填写订阅的主题 Topic 以及对应的处理函数。

处理函数即上文所编写的message_handler(...)函数:

3.3 发布消息并确认消息处理

当发布端发布了一条如下的消息后,消息会经由 DataFlux Func 主程序转发给上述message_handler(...)函数进行处理:

在连接器配置页面,对应的主题下方会出现最新消费信息:

点击后,可以看到更详细的任务信息:

至此,可以确认,在 Redis 中发布的消息确实被message_handler(...)函数接收并处理。

4. 发布消息

支持订阅消息的连接器一般都支持发布(publish)消息。

发布消息见 开发手册 / 连接器对象 API 对应连接器对象的 API,一般都为.publish(topic, message)形式。

5. 消息处理函数的任务记录

在最新版的 DataFlux Func 中,可以直接查看消息处理函数的任务记录。

如果订阅的消息数量庞大,记录每条消息的处理日志本身也可能会造成性能问题。可以参考 部署和维护 / 系统指标和任务记录 / 关闭本地函数任务记录 关闭「本地函数任务记录」,减轻 MySQL 存储压力

在旧版 DataFlux Func 中,消息处理函数不支持查询任务记录。如果想记录处理过程中产生的报错,可以参考如下代码实现:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import arrow
import traceback

def message_handler_impl(topic, message):
    # 此处实际消息处理函数
    # 此处假设产生了一个除0错误
    x = 100 / 0

@DFF.API('Message Handler')
def message_handler(topic, message):
    try:
        # 调用实际消息处理函数
        message_handler_impl(topic, message)

    except Exception as e:
        # 获取当前时间
        now_str = arrow.now('Asia/Shanghai').format('YYYY-MM-DD HH:mm:ss')

        # 提取完整的错误信息
        error_stack = traceback.format_exc()

        # 错误信息保存到 DFF.CACHE 中
        latest_error = '\n'.join([ '【时间】', now_str, '【错误信息】', error_stack ])
        DFF.CACHE.set('latest_error', latest_error)

        # 错误需要重新抛出
        raise

当执行过程产生问题后,可以在「管理 / 函数缓存管理器」中查看具体信息,如:

6. 单订阅和多订阅

此功能于 1.7.31 版本新增

所有的订阅器,都运行在 DataFlux Func 的server服务中,在订阅接收到消息后,会根据配置生成函数执行任务,送往worker执行。

订阅器支持 2 种订阅方式:

  1. 单订阅:即无论开启多少个server副本,订阅器始终只会在其中一个server中运行
  2. 多订阅:当开启多个server副本时,订阅器会在每个server中都运行

对于 Redis 订阅器,由于不支持共享订阅处理,多订阅只会导致消息重复消费,因此目前只支持单订阅。

对于 MQTT、Kafka 订阅器,可以选择单订阅或者多订阅。

MQTT 在使用多订阅时,Topic 需要按照 $share/group_name/topic_name 方式进行共享订阅,否则会导致重复收到相同消息

有关 DataFlux Func 包含的服务,以及开启多个 server 副本,请参考 部署和维护 / 架构、扩容与限制资源

7. 提升订阅处理速度

此功能于 1.7.31 版本新增

对于 DataFlux Func 来说,由于订阅器运行于server服务中,但函数运行与worker服务中,因此订阅处理速度包含了两部分:订阅消息接收速度、订阅函数执行速度。

如果订阅的消息数量不大,但每条消息的处理较为复杂,耗时较长,那么需要增加worker的数量来保证消息及时处理。

如果订阅的消息数量巨大,那需要同时提高serverworker的数量,并将订阅器设置为「多订阅」模式。

有关如何进行系统扩容,请参考 部署和维护 / 架构、扩容与限制资源

8. 订阅限制

此功能于 1.7.31 版本新增

当订阅消息数量巨大时,由于服务端的发布-订阅方式不同,因此相应地存在不同的限制。

Redis / MQTT

Redis、MQTT 订阅器由于无法在订阅端控制消息接收速度,因此接收到的消息会进入内部的缓冲池中。

缓冲池默认大小为 5000,即做多允许 5000 条订阅消息驻留等待处理,期间如有更多的消息接收,则溢出的消息会被丢弃。

为解决上述问题,可以通过增加worker数量,提高订阅消息的处理速度。

Kafka

Kafka 订阅器由于可以在订阅端控制消息接收速度,因此不存在缓冲池,也不会丢弃消息。

当消息处理速度跟不上消息发布速度时,可以通过增加worker数量,提高订阅消息的处理速度。

9. 已知问题

目前已知问题如下。

MQTT 卡顿、无法接收消息

MQTT Broker 在连接 EMQX 时可能会遇到卡顿、无法收到消息的问题。

此问题在早期社区单机版 EMQX 中出现过,其他版本的 EMQX 尚未测试。

但连接 mosquitto 时未见此问题。

Kafka 启动后无法立刻进行消费

Kafka 连接器在订阅后,可能会在最初的几分钟内处于暂停状态,后续才会正常进行消息消费。

原因可能与 Kafka 的 Rebalance 机制有关