阿里云数据同步
旧版提示:本脚本包可以继续使用,但建议迁移至新版「集成」系列脚本。
本文档主要介绍如何使用脚本市场中「阿里云数据同步」脚本包同步阿里云平台的相关数据。
请始终使用最新版 DataFlux Func 进行操作
1. 背景
在使用观测云、TrueWatch 的过程中,云平台的一些数据可能无法通过 DataKit 直接采集。
因此,DataFlux Func 提供了与各云平台对应的数据同步脚本包。用户可以从脚本市场安装相关的数据同步脚本包,进行简单的配置后,即可同步云平台的数据。
本文假设用户已经了解并安装了相关脚本包。
有关如何在 DataFlux Func 的脚本市场中安装脚本版,请参考:
本文假设用户已经在 DataFlux Func 中正确连接了 DataKit。
有关如何在 DataFlux Func 中连接 DataKit,请参考:
2. 关于本脚本包
本脚本包主要用于阿里云平台数据的获取并同步至观测云、TrueWatch。
使用本脚本包,需要阿里云平台具有对应权限的 AK。您可在阿里云「访问控制」中创建 RAM 账号,并为 RAM 账号创建 AK 并授予所需权限。
为了您的账号安全,请勿直接使用账号 AK,或为 RAM 账号 AK 分配过大的权限
所需权限如下:
阿里云产品 |
AK 所需权限 |
云监控(Metric) |
AliyunCloudMonitorReadOnlyAccess |
日志服务(SLS) |
AliyunLogReadOnlyAccess |
也可以直接为 RAM 账号授予全部云资源的只读权限 ReadOnlyAccess
。
3. 典型代码示例
通过简单的配置和极少量代码,即可实现相关数据的同步功能。
以下典型代码示例为实现同步功能的最简单配置。
3.1 云监控(Metric)
Python |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 | import dataflux_aliyun__metric as metric
# 账号
ACCOUNTS = [
{
# 阿里云 AK
'akId' : '阿里云 AK ID',
'akSecret': '阿里云 AK Secret',
# 来自本账号的数据都自动添加以下标签
'extraTags': { 'account_name': 'metric_example' },
},
]
# 配置
OPTIONS = {
# 空配置表示获取所有指标数据
'acs_ecs_dashboard': {},
'acs_vpc_eip' : {},
}
@DFF.API('同步阿里云监控数据', fixed_crontab='* * * * *')
def run():
# 查询云监控数据
data = metric.query(ACCOUNTS, OPTIONS)
# 调用 DataKit 写入数据
DFF.CONN('datakit').write_metric_many(data=data)
|
3.2 日志服务(SLS)
Python |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 | import dataflux_aliyun__sls as sls
# 账号
ACCOUNTS = [
{
# 阿里云 AK
'akId' : '阿里云 AK ID',
'akSecret': '阿里云 AK Secret',
# 来自本账号的数据都自动添加以下标签
'extraTags': { 'account_name': 'sls_example' },
},
]
# 配置
OPTIONS = {
# 接入点
'cn-hangzhou.log.aliyuncs.com': {
# 项目名
'slsaudit-region-1947705082861112-cn-hangzhou': [
# 日志库
'oss_log',
'internal-etl-log',
],
},
}
@DFF.API('同步阿里云日志数据', fixed_crontab='* * * * *')
def run():
# 查询日志服务数据
data = sls.query(ACCOUNTS, OPTIONS)
# 调用 DataKit 写入数据
DFF.CONN('datakit').write_logging_many(data=data)
|
4. 更复杂的代码示例
对于同步功能有进一步定制化要求的,可以根据如下的复杂示例编写。
4.1 云监控(Metric)
Python |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 | import re
import json
import dataflux_aliyun__metric as metric
# 账号
ACCOUNTS = [
{
# 阿里云 AK
'akId' : '阿里云 AK ID',
'akSecret': '阿里云 AK Secret',
# 来自本账号的数据都自动添加以下标签
'extraTags': { 'account_name': 'metric_example' },
},
]
# 配置
OPTIONS = {
# ECS
'acs_ecs_dashboard': {
# 可以指定仅提取部分监控项目
'only': [
'CPUUtilization', # 直接指定需要的监控项(默认取第一个数值项,此处将获取 Average)
lambda m: m.endswith('BPS'), # 使用函数指定(返回 True 表示需要提取,默认取第一个数值项,此处将获取 Average)
re.compile('^Disk.+IOPS$'), # 使用正则表达式指定(能够匹配表示需要提取,默认取第一个数值项,此处将获取 Average)
{
# 支持复杂配置
'metric' : 'TotalCredit', # 指定需要提取的监控项目(同样支持直接指定、正则表达式、函数)
'statistic': 'Minimum', # 指定监控项值的统计方法
'convert' : lambda x: int(x), # 对监控项值进行处理
},
],
},
# 弹性公网 IP
'acs_vpc_eip': {
# 可以指定忽略的监控项
'ignore': [
'out_ratelimit_drop_speed', # 直接指定需要忽略的监控项
lambda m: m.endswith('.rate_percentage'), # 使用函数忽略(返回 True 表示忽略)
re.compile('^net\.tx'), # 使用正则表达式忽略(能够匹配表示忽略)
],
},
# CDN
'acs_cdn': {}, # 空配置,取所有监控项第一个数值项
}
@DFF.API('同步阿里云监控数据', fixed_crontab='* * * * *')
def run():
# 查询云监控数据
data = metric.query(ACCOUNTS, OPTIONS)
if _DFF_ORIGIN != 'crontab':
# 调试运行时:打印部分数据
print('Data to write:', json.dumps(data[0:3], indent=2))
else:
# 自动触发运行时:写入 DataKit
if not data:
return print('No data')
dk_res = DFF.CONN('datakit').write_metric_many(data=data)
print('DataKit response:', dk_res)
|
4.2 日志服务(SLS)
Python |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 | import json
import dataflux_aliyun__sls as sls
# 账号
ACCOUNTS = [
{
# 阿里云 AK
'akId' : '阿里云 AK ID',
'akSecret': '阿里云 AK Secret',
# 来自本账号的数据都自动添加以下标签
'extraTags': { 'account_name': 'sls_example' },
},
]
# 配置
OPTIONS = {
# 接入点
'cn-hangzhou.log.aliyuncs.com': {
# 项目名
'slsaudit-region-1947705082861112-cn-hangzhou': [
# 日志库
'oss_log', # 直接指定日志库(默认原样获取)
{
# 支持复杂配置
'logstore': 'internal-etl-log',
'topic': None, # 指定 topic
'query': 'SELECT COUNT(*) AS count', # 指定 query
# 对日志内容进行处理
'convert': lambda s: { 'logCount': 'nodata' if not s else s.get('count') },
},
],
},
}
@DFF.API('同步阿里云日志数据', fixed_crontab='* * * * *')
def run():
# 查询日志服务数据
data = sls.query(ACCOUNTS, OPTIONS)
if _DFF_ORIGIN != 'crontab':
# 调试运行时:打印部分数据
print('Data to write:', json.dumps(data[0:3], indent=2))
else:
# 自动触发运行时:写入 DataKit
if not data:
return print('No data')
dk_res = DFF.CONN('datakit').write_logging_many(data=data)
print('DataKit response:', dk_res)
|
5. 创建定时任务(旧版:自动触发配置)
在完成代码编写后,发布脚本。
之后在「管理 / 定时任务(旧版:自动触发配置)」中,为已编写的脚本创建定时任务(旧版:自动触发配置)即可。
X. 附录
本脚本中所涉及的相关配置项目,根据本附录查询。
X.1 可用的阿里云监控项
完整列表索引请参考阿里云官方文档:
阿里云监控 / 附录 1 云服务监控项 / 概览
如上述阿里云官方文档地址发生改变,可尝试使用以下关键字搜索此文档:
Text Only |
---|
| 概览 监控项索引表 site:help.aliyun.com
|