pipeline과 cancel, recovery를 상태로 다루기

fetch, validate, load, aggregate 같은 데이터 작업 단계를 상태 전이와 recovery 관점에서 어떻게 바라볼 수 있는지 정리한다.

앞 글에서는 queue, lease, lock이 왜 같이 등장하는지 정리했다. 작업을 안전하게 가져오고, worker가 죽었을 때 복구할 수 있게 하고, 같은 대상의 중복 실행을 막기 위한 장치였다.

이번 글의 질문은 실패와 취소의 판단 경계를 어디에 둘 것인가다.

그 다음에 남는 문제는 작업 내부의 흐름이다. 데이터 작업은 보통 하나의 명령으로 끝나지 않는다.

예를 들어 외부 API에서 주문 데이터를 가져와 내부 분석 테이블로 적재하는 작업이라면 이런 흐름이 자연스럽다.

fetch
-> validate
-> load
-> aggregate

여기서 중요한 전제는 job의 입력 경계가 고정되어 있어야 한다는 점이다. 예를 들어 “상점 A의 2026-05-06 00:00~01:00 주문 데이터”처럼 대상과 시간 구간이 정해져 있거나, “cursor snapshot X부터 Y까지”처럼 다시 실행해도 같은 입력 집합을 읽는 기준이 있어야 한다.

이 경계가 없으면 recovery 이야기가 흐려진다. worker가 죽은 뒤 다시 실행했을 때 읽는 주문 집합이 달라진다면, 같은 job을 재시도하는 것인지 새로운 데이터를 처리하는 것인지 구분하기 어렵다.

처음에는 이 순서를 script 안에 넣어도 될 것 같았다. script가 API를 호출하고, 성공하면 스키마를 검증하고, 그 다음 warehouse에 적재하고, 마지막으로 집계 테이블을 갱신하는 식이다.

하지만 그렇게 하면 애플리케이션은 전체 흐름을 알기 어렵다. 어디까지 성공했는지, 어떤 단계에서 멈췄는지, 취소 요청을 어디서 반영해야 하는지가 다시 애매해진다.

그래서 실행 순서를 애플리케이션 쪽의 pipeline으로 끌어올린다.

plan = [FETCH, VALIDATE, LOAD, AGGREGATE]

for step in plan:
    if cancel_requested():
        mark_cancel_requested()
        break

    result = run(step)

    if result.failed:
        mark_failed()
        break

이 슈도 코드는 단순하지만 중요한 기준을 담고 있다.

  • 순서는 script 내부가 아니라 controller가 안다
  • 실패하면 다음 단계를 실행하지 않는다
  • 취소는 step 경계에서 확인한다
  • 마지막 결과는 job 또는 execution 상태로 영속 저장한다

여기까지 정리하면 pipeline은 단순한 함수 호출 목록이 아니다. 어떤 단계까지 진행됐고, 어디서 멈췄고, 다음 단계로 넘어가도 되는지를 남기는 실행 기록에 가깝다.

여기서 cancel은 생각보다 미묘하다. 실행 중인 process를 즉시 죽일 수도 있고, 다음 step을 시작하지 않는 수준에서 멈출 수도 있다.

처음부터 모든 걸 처리하려고 하면 구조가 커진다. 그래서 단계적으로 볼 수 있다.

Phase A: step 시작 전 cancel 확인
Phase B: 실행 중 process interrupt
Phase C: timeout, retry, cleanup 정책

이렇게 나누면 현재 단계에서 무엇을 보장하는지 선명해진다. 예를 들어 step 경계 cancel만 지원한다면, 이미 실행 중인 fetch process는 즉시 죽이지 않는다. 대신 다음 validate를 시작하기 전에 취소를 확인한다.

FETCH 실행 중 cancel 요청
-> FETCH 종료
-> VALIDATE 시작 전 cancel 확인
-> CANCELLED

이 모델은 완벽한 즉시 중단은 아니지만, 상태 기록은 명확하게 만든다. 다만 cancel 요청이 들어왔다완전히 안전하게 취소됐다는 같은 말이 아니다. 이미 FETCH가 일부 진행됐다면 임시 파일이나 staging row 같은 중간 결과가 남아 있을 수 있다. 그래서 실제 시스템에서는 cancel reason, 마지막 완료 step, cleanup 필요 여부 같은 보조 정보가 함께 필요해질 수 있다.

복구도 비슷하다. worker가 죽었을 때 모든 상태를 같은 방식으로 처리하면 안 된다.

QUEUED
  아직 아무도 가져가지 않음

LEASED
  worker가 가져갔지만 실행 시작 전일 수 있음

RUNNING
  실제 실행 중이었음

STALE
  실행 중이었지만 lease나 heartbeat 기준으로 더 이상 정상 진행을 믿기 어려움

그래서 recovery 규칙도 상태별로 달라진다.

LEASED + lease 만료 + cancel 없음
-> QUEUED

LEASED + lease 만료 + cancel 있음
-> CANCELLED

RUNNING + lease 만료
-> STALE

이 규칙이 필요한 이유는, 가져갔지만 시작 못 한 작업실행 중 죽은 작업은 의미가 다르기 때문이다. 전자는 다시 대기열에 넣어도 되지만, 후자는 외부 side effect가 있었을 수 있다.

다만 여기에도 전제가 있다. LEASED -> QUEUED는 실행이 실제로 시작되지 않았다는 경계가 분명할 때 안전하다. 장시간 RUNNING 작업은 lease가 만료되기 전에 heartbeat나 lease renewal로 “아직 살아 있다”는 신호를 갱신해야 한다. 그렇지 않으면 정상 실행 중인 작업을 죽은 작업으로 오해할 수 있다.

예를 들어 load 중간에 process가 죽었다면 staging table 일부가 이미 바뀌었을 수 있다. 이런 작업을 조용히 다시 QUEUED로 돌리면 더 위험할 수 있다. 특히 입력 경계가 고정되어 있지 않다면 재실행 시 읽는 주문 집합까지 달라질 수 있다. 그래서 STALE처럼 운영자가 볼 수 있는 상태로 남기는 편이 낫다.

여기서 상태 모델의 역할이 분명해진다. 상태 모델은 모든 문제를 자동으로 해결하지 않는다. 대신 위험한 애매함을 드러낸다.

이 작업은 다시 실행해도 되는가
아니면 사람이 확인해야 하는가
취소된 것인가
실패한 것인가
실행 중 죽은 것인가

이 질문에 답하려면 상태 이름이 필요하다.

그래서 이 구조는 단순한 FSM 놀이가 아니다. 데이터 작업이 남기는 side effect와 운영자가 판단해야 할 경계를 분리하기 위한 장치다.

정리하고 나면 다음 질문이 남는다.

  • 그렇다면 이런 상태 모델은 모든 데이터 작업에 넣어야 하는가

이어서 읽기

작성 수정