一文全面了解“基于k8s的调度系统Pipeline”技术

1 面向对象
Kubeflow Pipelines作为一种新型的调度器,与其他调度器相比,可以更好地支持机器学习相关的工作流。
机器学习主要涉及数据、训练、保存模型、部署模型等几个重要流程。Pipeline提供了一个web管理界面替换,供算法工程师自定义流程,而且整个过程运行在k8s集群中。
对于资源利用率有一定要求的公司,统一使用k8s上的调度器是一个不错的选择。它不仅仅适用于机器学习,也同样适用日常的工作流调度。
2 业务场景
当有以下需求时,基于k8s的调度系统Pipeline技术将会是你的得力助手:
需要用到工作流以及定时调度;
机器学习相关任务;
调度系统云原生管理。
如:星空已上线的数据智能服务,包含智能销售预测、销售订单风险、应收坏账风险应用等。
3 架构图
Pipeline技术的架构图如下:

Pipeline技术架构
通过上图可以看出,Pipeline的技术架构主要由八个部分组成:
Python SDK: 用于创建Kubeflow Pipelines 组件的特定语言(DSL);
DSL Compiler: 将Python代码转换成YAML静态配置文件(DSL编译器);
Pipeline Web Server: Pipeline的前端服务,收集各种数据以显示相关视图:当前正在运行的Pipeline列表、Pipeline执行的历史记录,有关各个Pipeline运行的调试信息和执行状态等;
Pipeline Service:Pipeline的后端服务,调用K8S服务,从YAML创建 Pipeline运行;
Kubernetes Resources:创建CRDs运行Pipeline;
Machine Learning Metadata Service: 用于监视由Pipeline Service创建的Kubernetes资源,并可以将这些资源的状态持久在保留ML元数据服务中(存储任务流容器之间的input/output数据交互);
Artifact Storage:用于存储Metadata和Artifact。Kubeflow Pipelines可以将元数据存储在MySQL数据库中,也可以将工件制品存储在Minio服务器或S3等工件存储中;
Orchestration controllers:对任务的编排,比如Argo Workflow控制器,可以协调任务驱动的工作流。
4 解决方案
当我们想要创建四个工作流组件时,效果如下图:

工作流组件
想要实现上述效果,可以通过以下6步完成:
Step1:在k8s部署Pipeline,详细部署方式参考Pipeline官网:https://github.com/kubeflow/pipelines。
Step2:定义任务运行基础镜像,如custom_python3:latest,需要包含脚本中python所用模块,编写dockerfile,具体代码如下:
FROM python:3.7-slim WORKDIR /app RUN pip install -U scikit-learn numpy COPY preprocess.py ./preprocess.py ENTRYPOINT [ "python", "preprocess.py" ]
运行代码后生成镜像,推到远程仓库。
Step3:编写流程脚本,脚本是具体业务脚本逻辑,与Pipeline本身无关,如:Preprocess-data流程脚本。
import numpy as np
from sklearn import datasets
from sklearn.model_selection import train_test_split
def _preprocess_data():
X, y = datasets.load_boston(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33)
np.save('x_train.npy', X_train)
np.save('x_test.npy', X_test)
np.save('y_train.npy', y_train)
np.save('y_test.npy', y_test)
if __name__ == '__main__':
print('Preprocessing data...')
_preprocess_data()Step4:构建工作流 pipeline.py。
import kfp
from kfp import dsl
def preprocess_op():
return dsl.ContainerOp(
name='Preprocess Data',
image='wintfru/boston_pipeline_preprocess:v1',
arguments=[],
file_outputs={
'x_train': '/app/x_train.npy',
'x_test': '/app/x_test.npy',
'y_train': '/app/y_train.npy',
'y_test': '/app/y_test.npy',
}
)
def train_op(x_train, y_train):
return dsl.ContainerOp(
name='Train Model',
image='wintfru/boston_pipeline_train:v1',
arguments=[
'--x_train', x_train,
'--y_train', y_train
],
file_outputs={
'model': '/app/model.pkl'
}
)
def test_op(x_test, y_test, model):
return dsl.ContainerOp(
name='Test Model',
image='wintfru/boston_pipeline_test:v1',
arguments=[
'--x_test', x_test,
'--y_test', y_test,
'--model', 一文全面了解“基于k8s的调度系统Pipeline”技术
声明:除非特别标注,否则均为本站原创文章,转载时请以链接形式注明文章出处。如若本站内容侵犯了原著者的合法权益,可联系本站删除。



