跳转至

脚本开发 / 观测云 DataKit、DataWay

DataKit、DataWay 连接器操作对象主要提供数据写入方法。

DFF.CONN(...)参数如下:

参数 类型 必须/默认值 说明
connector_id str 必须 连接器 ID
source str None 指定 Source
注意不要填写"mysql"等,防止与其他采集器冲突混淆
参数 类型 必须/默认值 说明
connector_id str 必须 连接器 ID
token str None 指定 Token
  • 一般性上报数据,请使用.write_by_category(...).write_by_category_many(...)方法
  • 一般性执行 DQL 语句,请使用.query(...)方法
  • 直接根据文档发送 GET 请求,请使用.get(...)方法
  • 直接根据文档发送 POST 请求,请使用.post_json(...)方法
  • 直接根据文档发送行协议数据,请使用.post_line_protocol(...)方法

DataKit 和 DataWay 之间绝大部分接口完全相同

由于 DataKit、DataWay 接口经常变动,本连接器并不会一对一封装所有的接口

由于不同版本的 DataKit、DataWay 对上报数据可能存在不同的要求或约束,请在阅读相关文档的基础上使用本连接器

.write_by_category(...)

write_by_category(...)方法用于向 DataKit、DataWay 写入特定类型的数据,参数如下:

参数 类型 必须/默认值 说明
category str 必须 数据类型,详见 观测云文档 / DataKit API
measurement str 必须 指标集名称
tags dict 必须 标签。键名和键值必须都为字符串
fields dict 必须 指标。键名必须为字符串,键值可以为字符串/整数/浮点数/布尔值之一
timestamp int/long/float {当前时间} 时间戳,支持秒/毫秒/微秒/纳秒。
headers dict None 请求 Header 参数

headers参数于 3.3.0 版本添加

示例如下:

Python
1
2
3
tags   = { 'host': 'web-01' }
fields = { 'cpu' : 10 }
status_code, result = datakit.write_by_category(category='metric', measurement='主机监控', tags=tags, fields=fields)

.write_by_category_many(...)

write_by_category(...)的批量版本,参数如下:

参数 类型 必须/默认值 说明
category str 必须 数据类型,详见 观测云文档 / DataKit API
data list 必须 数据点列表
data[#].measurement str 必须 指标集名称
data[#].tags dict 必须 标签。键名和键值必须都为字符串
data[#].fields dict 必须 指标。键名必须为字符串,键值可以为字符串/整数/浮点数/布尔值之一
data[#].timestamp int/long/float {当前时间} 时间戳,支持秒/毫秒/微秒/纳秒。
headers dict None 请求 Header 参数

headers参数于 3.3.0 版本添加

示例如下:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
data = [
    {
        'measurement': '主机监控',
        'tags'       : { 'host' : 'web-01' },
        'fields'     : { 'value': 10 }
    },
    {
        'measurement': '主机监控',
        'tags'       : {'host' : 'web-02'},
        'fields'     : {'value': 20}
    }
]
status_code, result = datakit.write_by_category_many(category='metric', data=data)

.write_metric(...) / .write_point(...)

旧版方法,与.write_by_category(category='metric', ...)等价

示例如下:

Python
1
status_code, result = datakit.write_metric(measurement='主机监控', tags={'host': 'web-01'}, fields={'cpu': 10})

.write_metrics(...) / .write_metric_many(...) / .write_points(...)

旧版方法,与.write_by_category_many(category='metric', ...)等价

示例如下:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
data = [
    {
        'measurement': '主机监控',
        'tags'       : {'host' : 'web-01'},
        'fields'     : {'value': 10}
    },
    {
        'measurement': '主机监控',
        'tags'       : {'host' : 'web-02'},
        'fields'     : {'value': 20}
    }
]
status_code, result = datakit.write_metrics(data=data)

.write_logging(...) / .write_logging_many(...)

旧版方法,与.write_by_category_many(category='logging', ...)等价

.query(...)

本方法支持 DataKit API DQL 查询接口中的参数,详细文档见 观测云文档 / DataKit API 文档

query(...)方法用于通过 DataKit、DataWay 执行 DQL 语句,参数如下:

参数 类型 必须/默认值 说明
dql str 必须 DQL 语句
dict_output bool False 是否自动转换数据为dict
raw bool False 是否返回原始响应。开启后dict_output参数无效。
all_series bool False 是否自动通过slimitsoffset翻页以获取全部时间线。
{DataKit、DataWay 原生参数} - - 透传至queries[0].{DataKit、DataWay 原生参数}

示例如下:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import time
import json

@DFF.API('Run DQL via DataKit')
def run_dql_via_datakit():
    datakit = DFF.CONN('datakit')

    # 使用 DataKit 原生参数`time_range`,限制最近 1 小时数据
    time_range = [
        int(time.time() - 3600) * 1000,
        int(time.time()) * 1000,
    ]

    # 查询并以 Dict 形式返回数据
    status_code, result = datakit.query(dql='O::HOST:(host,load,create_time)', dict_output=True, time_range=time_range)
    print(json.dumps(result))

输出示例:

JSON
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
  "series": [
    [
      {
        "time": 1622463105293,
        "host": "iZbp152ke14timzud0du15Z",
        "load": 2.18,
        "create_time": 1622429576363,
        "tags": {}
      },
      {
        "time": 1622462905921,
        "host": "ubuntu18-base",
        "load": 0.08,
        "create_time": 1622268259114,
        "tags": {}
      },
      {
        "time": 1622461264175,
        "host": "shenrongMacBook.local",
        "load": 2.395508,
        "create_time": 1622427320834,
        "tags": {}
      }
    ]
  ]
}

示例如下:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import time
import json

@DFF.API('Run DQL via DataKit')
def run_dql_via_datakit():
    datakit = DFF.CONN('datakit')

    # 添加 raw 参数,获取 DQL 查询原始值
    time_range = [
        int(time.time() - 3600) * 1000,
        int(time.time()) * 1000,
    ]

    # 查询并以 DataKit 原始返回值格式返回数据
    status_code, result = datakit.query(dql='O::HOST:(host,load,create_time)', raw=True, time_range=time_range)
    print(json.dumps(result, indent=2))

输出示例:

JSON
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
{
  "content": [
    {
      "series": [
        {
          "name": "HOST",
          "columns": [
            "time",
            "host",
            "load",
            "create_time"
          ],
          "values": [
            [
              1622463165152,
              "iZbp152ke14timzud0du15Z",
              1.92,
              1622429576363
            ],
            [
              1622462905921,
              "ubuntu18-base",
              0.08,
              1622268259114
            ],
            [
              1622461264175,
              "shenrongMacBook.local",
              2.395508,
              1622427320834
            ]
          ]
        }
      ],
      "cost": "1ms",
      "total_hits": 3
    }
  ]
}

.get(...)

本方法为通用处理方法,具体参数格式、内容等请参考 观测云文档 / DataKit API

get(...)方法用于向 DataKit、DataWay 发送一个 GET 请求,参数如下:

参数 类型 必须/默认值 说明
path str 必须 请求路径
query dict None 请求 URL 参数
headers dict None 请求 Header 参数

.post_json(...)

本方法为通用处理方法,具体参数格式、内容等请参考 观测云文档 / DataKit API

post_json(...)方法用于向 DataKit、DataWay 以 JSON 格式发送一个 POST 请求,参数如下:

参数 类型 必须/默认值 说明
path str 必须 请求路径
json_obj dict/list 必须 需要发送的 JSON 对象
query dict None 请求 URL 参数
headers dict None 请求 Header 参数

path参数于 1.6.8 版本调整为第一个参数

.post_line_protocol(...)

本方法为通用处理方法,具体参数格式、内容等请参考 观测云文档 / DataKit API

post_line_protocol(...)方法用于向 DataKit、DataWay 以行协议格式发送一个 POST 请求,参数如下:

参数 类型 必须/默认值 说明
path str 必须 请求路径
points list 必须 数据点格式的数据列表
points[#].measurement str 必须 指标集名称
points[#].tags dict 必须 标签。键名和键值必须都为字符串
points[#].fields dict 必须 指标。键名必须为字符串,键值可以为字符串/整数/浮点数/布尔值之一
points[#].timestamp int/long/float {当前时间} 时间戳,支持秒/毫秒/微秒/纳秒。
query dict None 请求 URL 参数
headers dict None 请求 Header 参数

path参数于 1.6.8 版本调整为第一个参数