Celery 에 대해 알아보자 03
Celery 고급 기능: 우선순위 큐, 그루핑, 병렬성 제한
Celery는 분산 작업 처리를 위한 강력한 도구입니다. 이 글에서는 Celery의 고급 기능 중 우선순위 큐, 그루핑, 그리고 병렬성 제한에 대해 자세히 알아보겠습니다.
1. 우선순위 큐 (Priority Queues)
우선순위 큐를 사용하면 중요한 작업을 먼저 처리할 수 있습니다. Celery에서는 메시지 브로커의 우선순위 큐 지원 여부에 따라 구현 방식이 달라집니다.
구현 방법
- Celery 설정에서 우선순위 범위 정의:
1
2
app.conf.task_queue_max_priority = 10
app.conf.task_default_priority = 5
- 큐 정의 시 우선순위 설정:
1
2
3
4
5
6
7
from kombu import Queue
app.conf.task_queues = [
Queue('high', routing_key='high', queue_arguments={'x-max-priority': 10}),
Queue('default', routing_key='default', queue_arguments={'x-max-priority': 5}),
Queue('low', routing_key='low', queue_arguments={'x-max-priority': 1})
]
- 태스크에 우선순위 할당:
1
2
3
4
5
6
7
8
9
10
11
@app.task(priority=10)
def high_priority_task():
pass
@app.task(priority=5)
def default_priority_task():
pass
@app.task(priority=1)
def low_priority_task():
pass
주의사항
- 모든 메시지 브로커가 우선순위 큐를 지원하는 것은 아닙니다. (예: Redis는 지원하지 않음)
- 우선순위는 상대적이며, 시스템 부하에 따라 효과가 달라질 수 있습니다.
2. 그루핑 (Grouping)
그루핑은 여러 작업을 하나의 그룹으로 묶어 실행하고 결과를 한 번에 처리할 수 있게 해주는 기능입니다.
기본 사용법
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from celery import group
# 여러 작업을 그룹으로 묶기
job = group([
add.s(2, 2),
multiply.s(4, 4),
power.s(2, 8)
])
# 그룹 실행
result = job.apply_async()
# 모든 결과 가져오기
print(result.get())
고급 사용법: 체인과 결합
1
2
3
4
5
6
7
8
9
from celery import chain, group
# 그룹 결과를 다른 태스크의 입력으로 사용
workflow = chain(
group([task1.s(), task2.s(), task3.s()]),
collect_results.s()
)
result = workflow.apply_async()
주의사항
- 그룹 내 작업 실패 처리:
group(...).apply_async(throw=False)
를 사용하여 일부 작업 실패를 허용할 수 있습니다. - 메모리 사용: 대규모 그룹은 메모리 사용량에 주의해야 합니다.
3. 병렬성 제한 (Concurrency Control)
Celery에서는 동시에 실행되는 작업의 수를 제한하여 시스템 리소스를 관리할 수 있습니다.
워커 레벨 제한
워커를 시작할 때 동시성을 제한할 수 있습니다:
1
celery -A your_app worker --concurrency=4
태스크 레벨 제한
특정 태스크의 동시 실행을 제한하려면 task_annotations
를 사용합니다:
1
2
3
app.conf.task_annotations = {
'tasks.expensive_task': {'rate_limit': '10/m'}
}
또는 태스크 데코레이터에서 직접 설정:
1
2
3
@app.task(rate_limit='10/m')
def expensive_task():
pass
Semaphore를 사용한 세밀한 제어
더 세밀한 제어가 필요한 경우 Celery의 Semaphore
를 사용할 수 있습니다:
1
2
3
4
5
6
7
8
9
from celery.concurrency import Semaphore
semaphore = Semaphore(5) # 최대 5개의 동시 실행 허용
@app.task
def limited_task():
with semaphore:
# 제한된 동시성으로 실행되는 코드
pass
주의사항
- 과도한 제한은 처리량을 떨어뜨릴 수 있습니다.
- 시스템 리소스와 작업 특성을 고려하여 적절한 제한을 설정해야 합니다.
결론
우선순위 큐, 그루핑, 병렬성 제한은 Celery를 사용하여 복잡한 작업 처리 시스템을 구축할 때 매우 유용한 고급 기능들입니다. 이러한 기능들을 적절히 활용하면 시스템의 효율성과 안정성을 크게 향상시킬 수 있습니다.
- 우선순위 큐를 통해 중요한 작업을 우선적으로 처리할 수 있습니다.
- 그루핑을 사용하여 관련된 여러 작업을 효율적으로 관리하고 실행할 수 있습니다.
- 병렬성 제한을 통해 시스템 리소스를 효과적으로 관리하고 안정성을 확보할 수 있습니다.
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.