Python 调用jenkins接口自动发布脚本

#!/opt/venv/bin/python
# coding: utf-8
# author: —

import jenkins
import sys
import threading
import time

url_dict={‘qa’:’http://xx.xx.xx.xx/jenkins/’,
‘jq’:’http://xx.xx.xx.xx/jenkins’,
‘prd’:’http://xx.xx.xx.xx/jenkins’}

username_dict={‘qa’:’admin’,’jq’:’admin’,’prd’:’admin’}

token_dict={‘qa’:’009753b3087cdc9be9c76c9a40f0f99a’,
‘jq’:’5a2ded6a567f5b76d23e8e0c241fc478′,
‘prd’:’5a2ded6a567f5b76d23e8e0c241fc478′}
str = “-”
env2job_dict={‘qa-routine’:”checkout_rls”,’qa-emergy’:”checkout_full”,’prd’:”checkout_zxq_rls2rls”}
help=”’For example:
#不切分支
./jenkins_deploy.py rebuild qa-job1,qa-job2,qa-job3
#qa例行发布
./jenkins_deploy.py qa-routine qa-job1,qa-job2,qa-job3,qa-job4
#qa紧急发布
./jenkins_deploy.py qa-emergy qa-job1,qa-job2,qa-job3
#生产、预生产发布
./jenkins_deploy.py prd prd-job1,prd-job2,prd-job3,prd-job4”’

class myjenkins:
def __init__(self,env):
self.server = jenkins.Jenkins(url_dict[env],
username=username_dict[env],
password=token_dict[env])

def build_job(self,job_name,param_dict=”):
next_build_num=self.server.get_job_info(job_name)[‘nextBuildNumber’]
if param_dict==”:
self.server.build_job(job_name)
else:
self.server.build_job(job_name,parameters=param_dict)
while 1:
try:
result=self.server.get_build_info(job_name,next_build_num)
if (result[‘result’] != None) and (result[‘building’] == False):
break
except:
continue
print(“%s(%s) build %s” %(job_name,result[‘displayName’],result[‘result’]))
return result[‘result’]

@staticmethod
def usage():
print(help)

def func(job_name,branch_do):
job_name=job_name.strip()
env = job_name.split(‘-‘)[0]
project_name = job_name.split(‘-‘)[1:]
project_name = str.join(project_name)
#newrvm特殊
project_name = project_name.strip(‘-BJ’)
project_name = project_name.strip(‘-SH’)
project_name = project_name.strip(‘-GB’)
project_name = project_name.strip(‘-SH-Auding’)
project_name = project_name.strip(‘-SH-Old’)

a = myjenkins(env)
if branch_do != ‘rebuild’:
checkout_result = a.build_job(env2job_dict[branch_do], {‘ProjectName’: project_name})
else:
checkout_result = ‘SUCCESS’

if checkout_result == ‘SUCCESS’:
a.build_job(job_name)
else:
print(“Checkout % failed” % (job_name))

def main():
branch_do=sys.argv[1]

if branch_do == ‘–help’:
myjenkins.usage()
exit()

jobs = sys.argv[2]
jobs=jobs.split(‘,’)

if jobs != []:
for job_name in jobs:
S=threading.Thread(target=func,args=(job_name,branch_do))
S.start()
time.sleep(15) #不sleep的话,切分支会抓到同一个next_build_num

if __name__ == ‘__main__’:
main()

发表评论

电子邮件地址不会被公开。 必填项已用*标注