跳转至

脚本开发 / InfluxDB

InfluxDB 连接器操作对象为 Python 第三方包 influxdb(版本 5.2.3)的封装,主要提供一些用于查询 InfluxDB 的方法。 本连接器兼容以下数据库:

  • 阿里云时序数据库 InfluxDB 版

本连接器基于 HTTP 协议连接

DFF.CONN(...)参数如下:

参数 类型 必须/默认值 说明
connector_id str 必须 连接器 ID
database str None 指定数据库

.query(...)

query(...)方法用于执行 InfluxQL 语句,参数如下:

参数 类型 必须/默认值 说明
sql str 必须 InfluxQL 语句,可包含绑定参数占位符,形式为$var_name
bind_params dict None 绑定参数
database str None 本次查询指定数据库
dict_output dict False 返回数据自动转换为{列名:值}形式

示例如下:

Python
1
2
3
4
5
6
7
8
9
sql = 'SELECT * FROM demo WHERE city = $city LIMIT 5'
bind_params = {'city': 'hangzhou'}
result = db.query(sql, bind_params=bind_params, database='demo')
# {'series': [{'columns': ['time', 'city', 'hostname', 'status', 'value'], 'name': 'demo', 'values': [['2018-12-31T16:00:10Z', 'hangzhou', 'webserver', 'UNKNOW', 90], ['2018-12-31T16:00:20Z', 'hangzhou', 'jira', 'running', 40], ['2018-12-31T16:00:50Z', 'hangzhou', 'database', 'running', 50], ['2018-12-31T16:01:00Z', 'hangzhou', 'jira', 'stopped', 40], ['2018-12-31T16:02:00Z', 'hangzhou', 'rancher', 'UNKNOW', 90]]}]}

sql = 'SELECT * FROM demo WHERE city = $city LIMIT 5'
bind_params = {'city': 'hangzhou'}
result = db.query(sql, bind_params=bind_params, database='demo', dict_output=True)
# {'series': [[{'city': 'hangzhou', 'hostname': 'webserver', 'status': 'UNKNOW', 'time': '2018-12-31T16:00:10Z', 'value': 90}, {'city': 'hangzhou', 'hostname': 'jira', 'status': 'running', 'time': '2018-12-31T16:00:20Z', 'value': 40}, {'city': 'hangzhou', 'hostname': 'database', 'status': 'running', 'time': '2018-12-31T16:00:50Z', 'value': 50}, {'city': 'hangzhou', 'hostname': 'jira', 'status': 'stopped', 'time': '2018-12-31T16:01:00Z', 'value': 40}, {'city': 'hangzhou', 'hostname': 'rancher', 'status': 'UNKNOW', 'time': '2018-12-31T16:02:00Z', 'value': 90}]]}

.query2(...)

query2(...)方法同样用于执行 InfluxQL 语句,但参数占位符不同,使用问号?作为参数占位符。参数如下:

参数 类型 必须/默认值 说明
sql str 必须 InfluxQL 语句,可包含参数占位符。
?表示需要转义的参数;
??表示不需要转义的参数
sql_params list None InfluxQL 参数
database str None 本次查询指定数据库
dict_output dict False 返回数据自动转换为{"列名": "值"}形式

示例如下:

Python
1
2
3
sql = 'SELECT * FROM ?? WHERE city = ? LIMIT 5'
sql_params = ['demo', 'hangzhou']
result = db.query2(sql, sql_params=sql_params, dict_output=True)

.write_point(...)

此功能于 1.1.13 版本新增

write_point(...)方法用于写入单个数据点。参数如下:

参数 类型 必须/默认值 说明
measurement str 必须 指标集
fields dict{str: str/int/float/bool} 必须 字段
键名必须为 str
键值可以为 str/int/float/bool
tags dict{str: str} None 标签
键名、键值必须为都为 str
timestamp str/int None 时间
ISO 格式,如:2020-01-01T01:02:03Z
UNIX 时间戳如:1577840523
database str None 本次写入指定数据库

示例如下:

Python
1
2
3
fields = { 'cpu': 100, 'mem': 0.5 }
tags   = { 'host': 'web001' }
result = db.write_point(measurement='host_monitor', fields=fields, tags=tags)

.write_points(...)

此功能于 1.1.13 版本新增

write_points(...)方法用于批量写入数据点。参数如下:

参数 类型 必须/默认值 说明
points list 必须 数据点数组
points[#]['measurement'] str 必须 指标集
points[#]['fields'] dict{str: str/int/float/bool} 必须 字段
键名必须为 str
键值可以为 str/int/float/bool
points[#]['tags'] dict{str: str} None 标签
键名、键值必须为都为 str
points[#]['time'] str/int None 时间
ISO 格式,如:2020-01-01T01:02:03Z
UNIX 时间戳如:1577840523
database str None 本次写入指定数据库
Python
1
2
3
4
5
6
7
8
points = [
    {
        'measurement': 'host_monitor',
        'fields'     : { 'cpu': 100, 'mem': 0.5 },
        'tags'       : { 'host': 'web001' },
    }
]
result = db.write_points(points)