钉钉告警机器人

#!/opt/venv/bin/python
#coding:utf-8
import json
import urllib2
from urllib2 import URLError
import sys
zabbix_addresses=[‘http://10.128.64.8/zabbix,ops_internetcar_r,Pass1q2w#’]
class ZabbixTools:
def __init__(self,address,username,password):

self.address = address
self.username = username
self.password = password

self.url = ‘%s/api_jsonrpc.php’ % self.address
self.header = {“Content-Type”:”application/json”}

def user_login(self):
data = json.dumps({
“jsonrpc”: “2.0”,
“method”: “user.login”,
“params”: {
“user”: self.username,
“password”: self.password
},
“id”: 0
})

request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])

try:
result = urllib2.urlopen(request)
except URLError as e:
print “Auth Failed, please Check your name and password:”, e.code
else:
response = json.loads(result.read())
result.close()
#print response[‘result’]
self.authID = response[‘result’]
return self.authID

def trigger_get(self):
data = json.dumps({
“jsonrpc”:”2.0″,
“method”:”trigger.get”,
“params”: {
“output”: [
“triggerid”,
“description”,
“priority”
],
“filter”: {
“value”: 1
},
“active”:0,
“expandData”:”hostname”,
“sortfield”: “priority”,
“sortorder”: “DESC”
},
“auth”: self.user_login(),
“id”:1
})

request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])

try:
result = urllib2.urlopen(request)
except URLError as e:
print “Error as “, e
else:
response = json.loads(result.read())
result.close()
issues = response[‘result’]
content = ”
if issues:
for line in issues:
triggerid=line[‘triggerid’]
host=self.host_get(triggerid)
content = content + “%s:%s\r\n” % (host,line[‘description’])
return content

def host_get(self,triggerid):
data = json.dumps({
“jsonrpc”:”2.0″,
“method”:”host.get”,
“params”: {
“triggerids”: triggerid
},
“auth”: self.user_login(),
“id”:1
})

request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])

try:
result = urllib2.urlopen(request)
except URLError as e:
print “Error as “, e
else:
response = json.loads(result.read())
result.close()
issues = response[‘result’]
host=issues[0][‘host’]
return host

def dd(context):
url=’https://oapi.dingtalk.com/robot/send?access_token=af6b900f50eb1d94c4bfa89413e7c375e7
b58629968a40320b65c8e2fcac1631′ con={“msgtype”:”text”,”text”:{“content”:context}}
jd=json.dumps(con)
req=urllib2.Request(url,jd)
req.add_header(‘Content-Type’, ‘application/json’)
response=urllib2.urlopen(req)
print(response.read())
if __name__ == “__main__”:
for zabbix_addres in zabbix_addresses:
address,username,password = zabbix_addres.split(‘,’)
z = ZabbixTools(address=address, username=username, password=password)
content = z.trigger_get()
print(content)
if content != ”:
dd(content)

print “Done!”