전공영역 공부 기록

Kubeflow pipeline

악분 2025. 9. 1. 16:09
반응형

Kubeflow pipeline이란?

kubeflow pipeline은 workflow를 실행하고 상태를 관리합니다. workflow는 목표를 달성하기 위한 작업의 묶음을 말합니다. AI생태계에서는 데이터 수집, 데이터 전처리, AI모델 학습 등을 pipeline으로 실행합니다. 아래 그림은 AI모델을 만들고 model registry에 업로드하는 pipeline입니다.

pipeline 사용은 필수가 아니지만 단점보다 장점이 많기 때문에, pipeline을 사용하지 않은 것 상황보다 pipeline을 사용하는 상황이 많습니다.

pipeline을 사용하면 여러가지 장점이 있지만 대표적인 2가지 장점을 소개합니다.

1. 이력관리 그리고 재현성 확보: pipeline을 다시 실행하면 동일한 결과를 얻을 수 있는 재현성이 확보됩니다. 그리고 실행한 workflow는 이력이 남습니다. 이력과 재현성은 분석, 디버깅, 롤백 등의 여러 작업에 사용됩니다. 특히, AI모델을 만들 때 반복실험이 많은데 이력관리 재현성은 반복실험에 매우 많은 도움이 됩니다.

2. 유지보수 용이성: pipeline은 여러 작업의 묶음이기 때문에 작업이 여러 단계로 실행됩니다. 따라서 문제가 발생할 때 어디서 문제가 발생했는지 쉽게 확인할 수 있습니다. 또한 코드 수정이 필요할 때 변경이 필요한 단계 코드만 수정하면 됩니다.

kubeflow pipeline 특징

1. 컨테이너 기반 workflow 실행: workflow는 컨테이너로 실행하며, workflow 엔진에게 실행을 위임합니다. workflow 엔진은 default로 argo workflow를 사용합니다. 컨테이너 관리(스케쥴링, 실행 등)은 쿠버네티스가 합니다.

2. DAG 기반 실행 순서 관리: workflow 실행 순서를 DAG(비순환 그래프)로 관리합니다. 사이클이 없어 이전 과정으로 돌아가지 않습니다.

pipeline 컴퍼넌트

ml-pipeline가 pipeline을 관리합니다. 하지만, ml-pipeline은 직접 workflow 작업을 수행하지 않고 pipeline의 전체 구조(yaml)을 저장하고 관리합니다.

workflow실행은 workflow 엔진이 수행합니다. default로 argo workflow를 사용합니다.

ml-pipeline, workflow 이외에 artifact store, model registry, metadata, artifact store(object storage)등이 있습니다.

정리하면, pipeline 실행을 위해 ml-pipline, workflow controller과 여러 컴퍼넌트가 관여하게 됩니다.

pipeline 작성 방법

pipeline 예제는 저의 github을 참고해주세요

 

pipeline은 Python 코드로 작성하며, @dsl.pipeline 데코레이터를 사용해 함수를 pipeline으로 정의할 수 있습니다.

from kfp import dsl

@dsl.pipeline(
  name="mnist-sklearn-pipeline",
  description="A pipeline to train a sklearn model on the MNIST dataset"
)
def main():
  ...

 

pipeline에서 호출하는 각각의 함수는 @dsl.component 데코레이터로 감싸야 합니다. component로 감싼 함수가 실제 워크플로우의 작업 단위가 되며, 각 component는 Kubernetes에서 컨테이너로 실행됩니다.

 

component를 정의할 때는 다음을 지정할 수 있습니다:

  • 베이스 이미지: 컨테이너 실행 시 사용할 Docker 이미지
  • 패키지 목록: 컨테이너 부팅 후 설치할 Python 패키지들
from kfp import dsl

@dsl.component(  
  base_image='python:3.12',  
  packages_to_install=['numpy', 'scikit-learn==1.5.1', 'joblib', 'pandas'\]  
)  
def load_and_preprocess_data():  
  ...

 

component간 데이터 교환

component들(즉, 컨테이너들) 간에 데이터를 주고받을 때는 artifact 저장소를 활용합니다. artifact 저장소는 object storage로, pipeline 실행 중 생성되는 모든 데이터를 관리합니다.

 

artifact 저장소를 사용할 때는 목적(저장 또는 불러오기)을 구분해야 하고, 만약 데이터를 저장한다면 타입을 지정해야 합니다.

  • 데이터 저장: Output 타입을 사용하여 artifact 저장소에 데이터를 저장
  • 데이터 불러오기: ${component_name}.outputs['변수명'] 형식으로 저장된 데이터를 가져옴
  • 데이터 타입: 저장하는 데이터의 성격에 따라 적절한 artifact 타입을 선택 (Dataset, Model, Metrics 등)

 

artifact 저장소는 Pipeline 실행 결과에서 주황색 폴더 아이콘으로 표시되며, 각 아이콘을 클릭하면 저장된 데이터를 확인할 수 있습니다.

 

아래 예제는 load_and_preprocess_data 함수에서 처리한 데이터를 artifact에 저장하고, 다른 component에서 이를 불러와 사용하는 방법을 보여줍니다.

from kfp.dsl import Input, Dataset, Output, Model, Metrics, Artifact

@dsl.component(
  base_image='python:3.12',
  packages_to_install=['numpy', 'scikit-learn==1.5.1', 'joblib', 'pandas']
)
def load_and_preprocess_data(
    x_train_output: Output[Dataset],
    y_train_output: Output[Dataset],
    x_test_output: Output[Dataset],
    y_test_output: Output[Dataset],
    metrics : Output[Metrics]
):
  # /tmp에 임시 파일로 저장 후 shutil.move를 사용해 아티팩트 경로로 이동
  np.save("/tmp/x_train.npy", x_train)
  shutil.move("/tmp/x_train.npy", x_train_output.path)

@dsl.pipeline(
  name="mnist-sklearn-pipeline",
  description="A pipeline to train a sklearn model on the MNIST dataset"
)
def mnist_sklearn_pipeline():
  preprocess_task = load_and_preprocess_data()
  train_task = train_mnist_model(
    x_train_input=preprocess_task.outputs['x_train_output'],
    y_train_input=preprocess_task.outputs['y_train_output'],
    max_depth=10
  )  

 

AI 모델의 평가 결과는 Metrics 타입으로 저장했습니다.

@dsl.component(
  base_image='python:3.12',
  packages_to_install=['numpy', 'scikit-learn==1.5.1', 'joblib']
)
def evaluate_model(
  model: Input[Model],
  x_test_data: Input[Dataset],
  y_test_data: Input[Dataset],
  metrics: Output[Metrics]
):
  # Predictions
  y_pred = loaded_model.predict(x_test)
  y_pred_proba = loaded_model.predict_proba(x_test)

  # Calculate metrics
  accuracy = accuracy_score(y_test, y_pred)

  metrics.log_metric("accuracy", round(accuracy, 4))
  metrics.log_metric("loss", round(loss, 4))

 

pipeline 실행 결과에서 메트릭은 수치로 표시되어 모델 성능을 쉽게 확인할 수 있습니다.

 

pipeline 등록 방법

파이썬으로 작성한 pipeline을 실행하려면, kubeflow에 pipeline을 등록해야 합니다. ml-pipeline API를 호하여 pipeline을 등록해도 되고, pipeline을 yaml로 변환하고 kuebeflow대시보드에서 pipeline을 등록해도 됩니다.

 

pipeline을 yaml로 변환하는 과정은 complie함수를 실행하면 됩니다.

import kfp

@dsl.pipeline(
  name="mnist-sklearn-pipeline",
  description="A pipeline to train a sklearn model on the MNIST dataset"
)
def mnist_sklearn_pipeline():
  ...

kfp.compiler.Compiler().compile(
  pipeline_func=mnist_sklearn_pipeline,
  package_path='mnist_sklearn_pipeline.yaml'
)

 

pipeline 실행 방법

pipeline을 등록했으면 실행할 준비가 거의 완료되었습니다. 마지막 준비는 Experiment(실험)을 생성해야 합니다. Experiment는 pipeline 실행 이력을 관리합니다.

 

Experiment 생성방법은 매우 간단합니다. 이름만 입력하면 됩니다.

 

experiment 생성이 완료되면 드디어 pipeline을 실행할 수 있습니다. pipeline에서 "Create run"버튼을 클릭하여 pipeline실행하면 됩니다.

 

pipeline의 실행결과는 experiment에 기록됩니다.

반응형