当前位置: 首页 > 后端技术 > Python

使用Python编写并提交Argo工作流程

时间:2023-03-26 17:32:07 Python

10人将获得CNCF商店价值100美元的礼券!你填了吗?问卷链接(https://www.wjx.cn/jq/9714648...)作者:AlexCollinsPython是一种流行的编程语言,供用户在Kubernetes上编写机器学习工作流。Argo不提供对Python开箱即用的一流支持。相反,我们提供Java、Golang和PythonAPI客户端。但这对大多数用户来说还不够。许多用户需要一个抽象层来添加组件和特定于用例的功能。今天你有两个选择。KFP编译器+Python客户端Argo工作流作为执行Kubeflow流水线的引擎。您可以定义一个Kubeflow管道并将其直接编译到Python中的Argo工作流中。然后,您可以使用ArgoPython客户端将工作流提交到Argo服务器API。这种方法允许您利用现有的Kubeflow组件。安装:pip3installkfppip3installargo-workflows示例:importkfpaskfpdefflip_coin():returnkfp.dsl.ContainerOp(name='Flipacoin',image='python:alpine3.6',command=['python','-c',"""importrandomres="heads"ifrandom.randint(0,1)==0else"tails"withopen('/output','w')asf:f.write(res)"""],file_outputs={'output':'/output'})defheads():returnkfp.dsl.ContainerOp(name='Heads',image="alpine:3.6",command=["sh","-c",'echo"itwasheads"'])deftails():returnkfp.dsl.ContainerOp(name='Tails',image="alpine:3.6",command=["sh","-c",'echo"这是尾巴"'])@kfp.dsl.pipeline(name='Coin-flip',description='Flipacoin')defcoin_flip_pipeline():flip=flip_coin()withkfp.dsl.Condition(flip.output=='heads'):heads()withkfp.dsl.Condition(flip.output=='tails'):tails()defmain():kfp.compiler.Compiler().compile(coin_flip_pipeline,__file__+".yaml")if__name__=='__main__':main()运行这个来创建你的工作流:apiVersion:argoproj.io/v1alpha1kind:Workflowmetadata:generateName:coin-flip-annotations:{pipelines.kubeflow.org/kfp_sdk_version:1.3.0,pipelines.kubeflow.org/pipeline_compilation_time:'2021-01-21T17:17:54.299235',pipelines.kubeflow.org/pipeline_spec:'{"description":"Flipacoin","name":"Coin-flip"}'}标签:{pipelines.kubeflow.org/kfp_sdk_version:1.3.0}spec:entrypoint:coin-fliptemplates:-name:coin-flipdag:tasks:-name:condition-1template:condition-1when:'"{{tasks.flip-a-coin.outputs.parameters.flip-a-coin-output}}"=="heads"'依赖项:[flip-a-coin]-名称:condition-2模板:condition-2当:'"{{tasks.flip-a-coin.outputs.parameters.flip-a-coin-output}}"=="tails"'依赖项:[flip-a-coin]-{name:flip-a-coin,template:flip-a-coin}-name:condition-1dag:tasks:-{name}:heads,template:heads}-name:condition-2dag:tasks:-{name:tails,template:tails}-name:flip-a-coincontainer:command:-python--c-"\nimportrandom\nres=\"heads\"ifrandom.randint(0,1)==0else\"tails\"\\nwithopen('/output','w')asf:\nf.write(res)\n"图像:python:alpine3.6输出:参数:-名称:翻转硬币输出值来源:{路径:/输出}工件:-{名称:翻转硬币输出,路径:/输出}-名称:heads容器:命令:[sh,-c,echo"itwasheads"]image:alpine:3.6-name:tailscontainer:command:[sh,-c,echo"itwastails"]image:alpine:3.6arguments:parameters:[]serviceAccountName:pipeline-runner注意,Kubeflow不支持这种方法可以使用客户端提交以上工作流,如下:importyamlfromargo.workflows.clientimport(ApiClient,WorkflowServiceApi,Configuration,V1alpha1WorkflowCreateRequest)defmain():config=Configuration(host="http://localhost:2746")client=ApiClient(configuration=config)service=WorkflowServiceApi(api_client=client)withopen("coin-flip.py.yaml")asf:manifest:dict=yaml.safe_load(f)delmanifest['spec']['serviceAccountName']service.create_workflow('argo',V1alpha1WorkflowCreateRequest(workflow=manifest))if__name__=='__main__':main()CoulerCouler是一个流行的项目,它允许您以与平台无关的方式指定工作流,但主要支持Argo工作流(未来计划支持Kubeflow和AirFlow):安装:pip3installgit+https://github.com/couler-proj/couler示例:importcouler.argoascoulerfromcouler.argo_submitterimportArgoSubmitterdefrandom_code():importrandomres="heads"ifrandom.randint(0,1)==0else"t病痛原则t(res)defflip_coin():returncouler.run_script(image="python:alpine3.6",source=random_code)defheads():returncouler.run_container(image="alpine:3.6",command=["sh","-c",'echo"这是头"'])deftails():returncouler.run_container(image="alpine:3.6",command=["sh","-c",'echo“这是尾巴”'])result=flip_coin()couler.when(couler.equal(result,"heads"),lambda:heads())couler.when(couler.equal(result,"tails"),lambda:tails())submitter=ArgoSubmitter()couler.run(submitter=submitter)这会创建以下工作流程:apiVersion:argoproj.io/v1alpha1kind:Workflowmetadata:generateName:couler-example-spec:templates:-name:couler-示例步骤:--名称:flip-coin-29模板:flip-coin--名称:heads-31模板:正面当:'{{steps.flip-coin-29.outputs.result}}==heads'-名称:tails-32模板:tails时间:'{{steps.flip-coin-29.outputs.result}}==tails'-name:flip-coinscript:name:''image:'python:alpine3.6'command:-pythonsource:|importrandomres="heads"ifrandom.randint(0,1)==0else"tails"print(res)-name:headscontainer:image:'alpine:3.6'命令:-sh-'-c'-echo"itwasheads"-name:tailscontainer:image:'alpine:3.6'command:-sh-'-c'-echo"itwastails"entrypoint:couler-examplettlStrategy:secondsAfterCompletion:600activeDeadlineSeconds:300点击阅读本站CNCF(CloudNativeComputingFoundation)原文成立于2015年12月,隶属于Linux基金会,是一个非盈利组织。CNCF(CloudNativeComputingFoundation)致力于培育和维护供应商中立的开源生态系统,以推广云原生技术。我们通过将最前沿的模型大众化,使大众能够接触到这些创新。扫描二维码关注CNCF微信公众号。