电脑桌面
添加蚂蚁七词文库到电脑桌面
安装后可以在桌面快捷访问

一文全面了解“基于k8s的调度系统Pipeline”技术

来源:金蝶云社区作者:金蝶2024-09-238

一文全面了解“基于k8s的调度系统Pipeline”技术


1 面向对象


Kubeflow Pipelines作为一种新型的调度器,与其他调度器相比,可以更好地支持机器学习相关的工作流


机器学习主要涉及数据、训练、保存模型、部署模型等几个重要流程。Pipeline提供了一个web管理界面替换,供算法工程师自定义流程,而且整个过程运行在k8s集群中。


对于资源利用率有一定要求的公司,统一使用k8s上的调度器是一个不错的选择。它不仅仅适用于机器学习,也同样适用日常的工作流调度。


2 业务场景


当有以下需求时,基于k8s的调度系统Pipeline技术将会是你的得力助手:


  • 需要用到工作流以及定时调度

  • 机器学习相关任务;

  • 调度系统云原生管理


如:星空已上线的数据智能服务,包含智能销售预测、销售订单风险、应收坏账风险应用等。


3 架构图


Pipeline技术的架构图如下:


Pipeline技术架构


通过上图可以看出,Pipeline的技术架构主要由八个部分组成:


  • Python SDK: 用于创建Kubeflow Pipelines 组件的特定语言(DSL);

  • DSL Compiler: 将Python代码转换成YAML静态配置文件(DSL编译器);

  • Pipeline Web Server: Pipeline的前端服务,收集各种数据以显示相关视图:当前正在运行的Pipeline列表、Pipeline执行的历史记录,有关各个Pipeline运行的调试信息和执行状态等;

  • Pipeline Service:Pipeline的后端服务,调用K8S服务,从YAML创建 Pipeline运行;

  • Kubernetes Resources:创建CRDs运行Pipeline;

  • Machine Learning Metadata Service: 用于监视由Pipeline Service创建的Kubernetes资源,并可以将这些资源的状态持久在保留ML元数据服务中(存储任务流容器之间的input/output数据交互);

  • Artifact Storage:用于存储Metadata和Artifact。Kubeflow Pipelines可以将元数据存储在MySQL数据库中,也可以将工件制品存储在Minio服务器或S3等工件存储中;

  • Orchestration controllers:对任务的编排,比如Argo Workflow控制器,可以协调任务驱动的工作流。


4 解决方案


当我们想要创建四个工作流组件时,效果如下图:


工作流组件


想要实现上述效果,可以通过以下6完成:


Step1:在k8s部署Pipeline,详细部署方式参考Pipeline官网:https://github.com/kubeflow/pipelines。


Step2:定义任务运行基础镜像,如custom_python3:latest,需要包含脚本中python所用模块,编写dockerfile,具体代码如下:


FROM python:3.7-slim 
WORKDIR /app 
RUN pip install -U scikit-learn numpy 
COPY preprocess.py ./preprocess.py 
ENTRYPOINT [ "python", "preprocess.py" ]


运行代码后生成镜像,推到远程仓库。


Step3:编写流程脚本,脚本是具体业务脚本逻辑,与Pipeline本身无关,如:Preprocess-data流程脚本。


import numpy as np 
from sklearn import datasets 
from sklearn.model_selection import train_test_split 
def _preprocess_data(): 
     X, y = datasets.load_boston(return_X_y=True) 
     X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33) 
     np.save('x_train.npy', X_train) 
     np.save('x_test.npy', X_test) 
     np.save('y_train.npy', y_train) 
     np.save('y_test.npy', y_test) 
if __name__ == '__main__': 
     print('Preprocessing data...') 
     _preprocess_data()


Step4:构建工作流 pipeline.py


import kfp 
from kfp import dsl 
def preprocess_op(): 
    return dsl.ContainerOp( 
        name='Preprocess Data', 
        image='wintfru/boston_pipeline_preprocess:v1', 
        arguments=[], 
        file_outputs={ 
            'x_train': '/app/x_train.npy', 
            'x_test': '/app/x_test.npy', 
            'y_train': '/app/y_train.npy', 
            'y_test': '/app/y_test.npy', 
        } 
    ) 
def train_op(x_train, y_train): 
    return dsl.ContainerOp( 
        name='Train Model', 
        image='wintfru/boston_pipeline_train:v1', 
        arguments=[ 
            '--x_train', x_train, 
            '--y_train', y_train 
        ], 
        file_outputs={ 
            'model': '/app/model.pkl' 
        } 
    ) 
def test_op(x_test, y_test, model): 
    return dsl.ContainerOp( 
        name='Test Model', 
        image='wintfru/boston_pipeline_test:v1', 
        arguments=[ 
            '--x_test', x_test, 
            '--y_test', y_test, 
            '--model', 

一文全面了解“基于k8s的调度系统Pipeline”技术

1 面向对象Kubeflow Pipelines作为一种新型的调度器,与其他调度器相比,可以更好地支持机器学习相关的工作流。机器学习主要涉及数据、训...
点击下载文档文档为doc格式

声明:除非特别标注,否则均为本站原创文章,转载时请以链接形式注明文章出处。如若本站内容侵犯了原著者的合法权益,可联系本站删除。

确认删除?
回到顶部
客服QQ
  • 客服QQ点击这里给我发消息
QQ群
  • 答案:my7c点击这里加入QQ群
支持邮箱
微信
  • 微信