포스트

Celery 에 대해 알아보자 03


Celery 고급 기능: 우선순위 큐, 그루핑, 병렬성 제한

Celery는 분산 작업 처리를 위한 강력한 도구입니다. 이 글에서는 Celery의 고급 기능 중 우선순위 큐, 그루핑, 그리고 병렬성 제한에 대해 자세히 알아보겠습니다.

1. 우선순위 큐 (Priority Queues)

우선순위 큐를 사용하면 중요한 작업을 먼저 처리할 수 있습니다. Celery에서는 메시지 브로커의 우선순위 큐 지원 여부에 따라 구현 방식이 달라집니다.

구현 방법

  1. Celery 설정에서 우선순위 범위 정의:
1
2
app.conf.task_queue_max_priority = 10
app.conf.task_default_priority = 5
  1. 큐 정의 시 우선순위 설정:
1
2
3
4
5
6
7
from kombu import Queue

app.conf.task_queues = [
    Queue('high', routing_key='high', queue_arguments={'x-max-priority': 10}),
    Queue('default', routing_key='default', queue_arguments={'x-max-priority': 5}),
    Queue('low', routing_key='low', queue_arguments={'x-max-priority': 1})
]
  1. 태스크에 우선순위 할당:
1
2
3
4
5
6
7
8
9
10
11
@app.task(priority=10)
def high_priority_task():
    pass

@app.task(priority=5)
def default_priority_task():
    pass

@app.task(priority=1)
def low_priority_task():
    pass

주의사항

  • 모든 메시지 브로커가 우선순위 큐를 지원하는 것은 아닙니다. (예: Redis는 지원하지 않음)
  • 우선순위는 상대적이며, 시스템 부하에 따라 효과가 달라질 수 있습니다.

2. 그루핑 (Grouping)

그루핑은 여러 작업을 하나의 그룹으로 묶어 실행하고 결과를 한 번에 처리할 수 있게 해주는 기능입니다.

기본 사용법

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from celery import group

# 여러 작업을 그룹으로 묶기
job = group([
    add.s(2, 2),
    multiply.s(4, 4),
    power.s(2, 8)
])

# 그룹 실행
result = job.apply_async()

# 모든 결과 가져오기
print(result.get())

고급 사용법: 체인과 결합

1
2
3
4
5
6
7
8
9
from celery import chain, group

# 그룹 결과를 다른 태스크의 입력으로 사용
workflow = chain(
    group([task1.s(), task2.s(), task3.s()]),
    collect_results.s()
)

result = workflow.apply_async()

주의사항

  • 그룹 내 작업 실패 처리: group(...).apply_async(throw=False)를 사용하여 일부 작업 실패를 허용할 수 있습니다.
  • 메모리 사용: 대규모 그룹은 메모리 사용량에 주의해야 합니다.

3. 병렬성 제한 (Concurrency Control)

Celery에서는 동시에 실행되는 작업의 수를 제한하여 시스템 리소스를 관리할 수 있습니다.

워커 레벨 제한

워커를 시작할 때 동시성을 제한할 수 있습니다:

1
celery -A your_app worker --concurrency=4

태스크 레벨 제한

특정 태스크의 동시 실행을 제한하려면 task_annotations를 사용합니다:

1
2
3
app.conf.task_annotations = {
    'tasks.expensive_task': {'rate_limit': '10/m'}
}

또는 태스크 데코레이터에서 직접 설정:

1
2
3
@app.task(rate_limit='10/m')
def expensive_task():
    pass

Semaphore를 사용한 세밀한 제어

더 세밀한 제어가 필요한 경우 Celery의 Semaphore를 사용할 수 있습니다:

1
2
3
4
5
6
7
8
9
from celery.concurrency import Semaphore

semaphore = Semaphore(5)  # 최대 5개의 동시 실행 허용

@app.task
def limited_task():
    with semaphore:
        # 제한된 동시성으로 실행되는 코드
        pass

주의사항

  • 과도한 제한은 처리량을 떨어뜨릴 수 있습니다.
  • 시스템 리소스와 작업 특성을 고려하여 적절한 제한을 설정해야 합니다.

결론

우선순위 큐, 그루핑, 병렬성 제한은 Celery를 사용하여 복잡한 작업 처리 시스템을 구축할 때 매우 유용한 고급 기능들입니다. 이러한 기능들을 적절히 활용하면 시스템의 효율성과 안정성을 크게 향상시킬 수 있습니다.

  • 우선순위 큐를 통해 중요한 작업을 우선적으로 처리할 수 있습니다.
  • 그루핑을 사용하여 관련된 여러 작업을 효율적으로 관리하고 실행할 수 있습니다.
  • 병렬성 제한을 통해 시스템 리소스를 효과적으로 관리하고 안정성을 확보할 수 있습니다.
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.