1. 背景
一般情况下,采集器从云厂商处获取资源后仅提取部分普遍重要的属性做 tags,如此对于一些用户来说还不够。本文将介绍如何为采集后(上报前)的数据补充额外的 tags。
2. 方案
在不修改官方采集器的前提下,采集器本身提供了 after_collect
参数,用户可赋值一个函数,对采集后的数据做二次处理,其中就包括添加额外的 tags。
Python |
---|
| def handler(point):
point['tags']['origin'] = 'shanghai'
return point
@DFF.API('xxx Collection', timeout=3600, fixed_crontab='* * * * *')
def run():
Runner(main.DataCollector(account, collector_configs, after_collect=handler), debug=True).run()
|
上面示例省略了无关配置,重点关注 handler
函数,该函数仅支持一个参数 point
, point
是采集器即将上报的数据,数据结构可参考相关采集器文档「数据上报格式」,可以肯定的是point
一定包含三个字段分别是measurement
、tags
、fields
(需要详细了解的同学,可自行查找行协议相关文档)。我们重点关注的就是point.tags
字段,将待补充的健值对插入 tags
中即可,示例中相当于给 point.tags
添加一个key
为origin
,value
为shanghai
的健值对。
3. 案例
将 AWS 控制台配置的 EC2 tags
,补充到采集器采集的 EC2 对象数据的tags
中
情景一:直接从point.fields
中提取Tags
字段补充到point.tags
Python |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39 | account = {
'ak_id' : DFF.ENV('aws_develop_test')['ak_id'],
'ak_secret' : DFF.ENV('aws_develop_test')['ak_secret'],
}
collector_configs = {
'regions': ['cn-northwest-1']
}
from guance_integration__runner import Runner
import guance_aws_ec2__main as main
from guance_integration__utils import json_loads
def add_tags(point):
# 如果 point.fields 中存在云资源的 Tags,直接取
cloud_tags = json_loads(point['fields'].get('Tags'))
if not cloud_tags:
return point
for t in cloud_tags:
t_key = t['Key']
t_v = t['Value']
# 对于已经存在的 tags 不需要被替换(区分大小写)
protected_tags = [k.lower() for k in point['tags'].keys()]
if t_key.lower() in protected_tags:
continue
# 对于一些双下划线开头结尾的 tags 谨慎补充,以下是禁止补充
if t_key.startswith('__') and t_key.endswith('__'):
continue
point['tags'][t_key] = t_v
return point
@DFF.API('AWS-EC2 Collection', timeout=3600, fixed_crontab='*/15 * * * *')
def run():
Runner(main.DataCollector(account, collector_configs, after_collect=add_tags)).run()
|
情景二:并非所有采集器的 point.fields
中都存在Tags
字段(持续支持中。..),如果没有支持,需要从云厂商开放的 API 中获取(也可能客户自己的 API):
Python |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 | account = {
'ak_id' : DFF.ENV('aws_develop_test')['ak_id'],
'ak_secret' : DFF.ENV('aws_develop_test')['ak_secret'],
}
# collector configuration
collector_configs = {
'regions': ['cn-northwest-1']
}
from guance_integration__runner import Runner
import guance_aws_ec2__main as main
from guance_integration__utils import json_loads
from guance_integration__client import AWS
def add_tags(point):
# 如果 point.fields 中不存在云资源的 Tags,可以调用云 API 获取
client = AWS(**account)
region_id = point['tags']['RegionId']
instance_id = point['tags']['InstanceId']
biz_params = {
'Filters': [
{
'Name': 'resource-id',
'Values': [
instance_id,
]
}
]
}
api_res = client.do_api(action='describe_tags', product='ec2', region_id=region_id, **biz_params)
if not api_res:
return point
cloud_tags = api_res.get('Tags')
if not cloud_tags:
return point
for t in cloud_tags:
t_key = t['Key']
t_v = t['Value']
# 对于已经存在的 tags 不需要被替换(区分大小写)
protected_tags = [k.lower() for k in point['tags'].keys()]
if t_key.lower() in protected_tags:
continue
# 对于一些双下划线开头结尾的 tags key 谨慎补充,该 demo 直接禁止补充
if t_key.startswith('__') and t_key.endswith('__'):
continue
point['tags'][t_key] = t_v
return point
@DFF.API('AWS-EC2 Collection', timeout=3600, fixed_crontab='*/15 * * * *')
def run():
Runner(main.DataCollector(account, collector_configs, after_collect=add_tags)).run()
|
4. 重点注意事项
- 云产品采集器中,自定义对象 tags 会自动补充到关联的指标 tags 中,所以如果您既开启自定义对象采集器,又开启了云监控采集器,需要补充 tags 时,只需要补充给对象采集器即可。
- 为采集器上报数据补充 tags 要特别注意,有的字段不能被覆盖,比如自定义对象的
name
字段,建议像案例一样,如果原数据 tags 存在相同的 key 就不要再补充进去了,防止出现意外情况。
after_collect
所赋值的函数只接收一个参数point
,处理过point
后函数必须返回一个/多个point
,如果没有返回或者处理过程中抛错,一律按照未经函数处理的原始数据上报,当定义了after_collect
函数无效时,首先排查这种可能。