포스트

Celery 에 대해 알아보자 02

Celery Standalone 버전 사용하기

Celery Standalone이란?

Celery Standalone은 Celery를 웹 프레임워크나 다른 애플리케이션과 통합하지 않고 독립적으로 사용하는 방식을 말합니다. 이는 Celery의 강력한 분산 작업 처리 기능을 단독으로 활용할 수 있게 해줍니다.

Celery Standalone의 장점

  1. 간단한 설정: 웹 프레임워크와의 통합 없이 빠르게 시작할 수 있습니다.
  2. 유연성: 다양한 스크립트나 프로그램에서 쉽게 사용할 수 있습니다.
  3. 독립성: 다른 시스템에 의존하지 않고 작업을 처리할 수 있습니다.
  4. 확장성: 필요에 따라 쉽게 작업자를 추가하거나 제거할 수 있습니다.

Celery Standalone 설정 및 사용 방법

1. 프로젝트 구조 설정

먼저, 다음과 같은 구조로 프로젝트를 설정합니다:

1
2
3
4
celery_project/
├── celery_app.py
├── tasks.py
└── run_tasks.py

2. Celery 애플리케이션 설정 (celery_app.py)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from celery import Celery

app = Celery('celery_project',
             broker='redis://localhost:6379',
             backend='redis://localhost:6379',
             include=['tasks'])

# 선택적 설정
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

3. 태스크 정의 (tasks.py)

1
2
3
4
5
6
7
8
9
10
11
12
from celery_app import app
import time

@app.task
def add(x, y):
    time.sleep(5)  # 작업이 오래 걸리는 것을 시뮬레이션
    return x + y

@app.task
def multiply(x, y):
    time.sleep(3)  # 작업이 오래 걸리는 것을 시뮬레이션
    return x * y

4. 태스크 실행 스크립트 (run_tasks.py)

1
2
3
4
5
6
7
8
9
10
11
from tasks import add, multiply

# 비동기적으로 태스크 실행
result1 = add.delay(4, 4)
result2 = multiply.delay(4, 4)

print("태스크가 백그라운드에서 실행 중입니다.")

# 결과 가져오기
print("덧셈 결과:", result1.get())
print("곱셈 결과:", result2.get())

5. Celery 워커 실행

터미널에서 다음 명령을 실행하여 Celery 워커를 시작합니다:

1
celery -A celery_app worker --loglevel=info

6. 태스크 실행

다른 터미널 창에서 다음 명령을 실행하여 태스크를 시작합니다:

1
python run_tasks.py

Celery Task Routing

Task Routing은 Celery의 고급 기능 중 하나로, 특정 태스크를 특정 큐나 워커로 보내는 방법을 제공합니다. 이는 작업 부하를 분산하거나, 특정 작업을 특정 리소스에 할당하는 데 유용합니다.

Task Routing의 장점

  1. 작업 부하 분산: 다양한 유형의 작업을 여러 워커에 분산시켜 효율성을 높일 수 있습니다.
  2. 리소스 최적화: 특정 작업을 특정 하드웨어 리소스에 할당할 수 있습니다.
  3. 우선순위 관리: 중요한 작업을 우선적으로 처리할 수 있는 전용 큐를 만들 수 있습니다.
  4. 격리: 문제가 있는 작업을 격리하여 다른 작업에 영향을 미치지 않도록 할 수 있습니다.

Task Routing 설정 방법

  1. 큐 정의: 먼저, 사용할 큐를 정의합니다.
1
2
3
4
5
app.conf.task_routes = {
    'tasks.add': {'queue': 'arithmetic'},
    'tasks.multiply': {'queue': 'arithmetic'},
    'tasks.data_processing': {'queue': 'data'},
}
  1. 태스크에 큐 할당: 태스크를 정의할 때 특정 큐를 지정할 수 있습니다.
1
2
3
@app.task(queue='arithmetic')
def add(x, y):
    return x + y
  1. 워커 실행: 특정 큐를 처리하는 워커를 실행합니다.
1
celery -A celery_app worker --loglevel=info -Q arithmetic

고급 Routing 기능

  1. 동적 라우팅: 런타임에 라우팅 결정을 할 수 있습니다.
1
2
3
@app.task(queue=lambda: 'arithmetic' if random.random() > 0.5 else 'data')
def random_task():
    pass
  1. 라우팅 패턴: 정규 표현식을 사용하여 태스크를 라우팅할 수 있습니다.
1
2
3
4
app.conf.task_routes = {
    'tasks.data.*': {'queue': 'data'},
    'tasks.arithmetic.*': {'queue': 'arithmetic'},
}
  1. 우선순위 설정: 큐에 우선순위를 부여할 수 있습니다.
1
2
3
4
5
6
app.conf.task_queue_max_priority = 10
app.conf.task_default_priority = 5

@app.task(priority=7)
def high_priority_task():
    pass

Task Routing 사용 시 주의사항

  1. 복잡성 관리: 너무 많은 큐를 만들면 시스템이 복잡해질 수 있습니다.
  2. 리소스 할당: 각 큐에 적절한 수의 워커를 할당해야 합니다.
  3. 모니터링: 각 큐의 성능을 모니터링하고 필요에 따라 조정해야 합니다.
  4. 데드락 방지: 순환 의존성이 있는 작업들이 서로 다른 큐에 있지 않도록 주의해야 합니다.

결론

Celery Standalone 버전을 사용하면, 복잡한 웹 프레임워크 통합 없이도 강력한 분산 작업 처리 시스템을 구축할 수 있습니다. 여기에 Task Routing을 활용하면, 더욱 효율적이고 유연한 작업 처리 시스템을 만들 수 있습니다.

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.