celery 無法 inspect

環境:

  • Celery 4.2.1
  • Broker backend: RabbitMQ
  • Result backend: Redis

想使用 celery inspect 來查 memory leak 問題,但是 celery inspect 時,都會出現 Error: No nodes replied within time constraint. 的訊息。

察看 log 以後,確定當 celery log 出現 warning,connection reset by peer 時,celery status 就會出現 “Error: No nodes replied within time constraint.” ,在這個同時,rabbitmq server log 也會出現 Missed heartbeats from client, timeout: 20s 的 ERROR ,有看過以下 issues,但沒幫助

後來找到這篇 rabbitmq报Heartbeat missing with heartbeat = 60 seconds ,參考內容把 celery 的 broker_heartbeat 設定改為 0,避免 RabbitMQ 做 heartbeat 的檢查。看來就解決了。

原理的解說可以參考這篇:Detecting Dead TCP Connections with Heartbeats and TCP Keepalives 大致上是說,RabbitMQ 有連線 heartbeat,當 client 沒回應的時候,RabbitMQ 會主動斷掉。而這個 heartbeat 是可以透過 client 在初始化連線時去設定的,所以調整 celery 設定,請 RabbitMQ 不要做 heartbeat 檢查就可以了。

Celery log 出現 Received and deleted unknown message. Wrong destination

在查 Periodic task 為什麼沒執行,beat 是有發出訊息,但 task 卻沒被執行。在 worker log 裡找到

Received and deleted unknown message. Wrong destination

的訊息,查了以後,找到這些資料:

所有的矛頭都指向 librabbitmq ,所以解法有兩種,一種是移除 librabbitmq,一種則是將 protocol 改為 1

 CELERY_TASK_PROTOCOL = 1

Celery Best Practice 筆記

邊看這篇 Celery – Best Practices 邊做的簡單摘錄與筆記。

  1. 不要用資料庫當作 AMQP Broker。Celery 會建立數個 process 去 poll 資料庫來檢查是否有新的工作,這會導致資料庫的 disk I/O 增加,也會增加對資料庫的連接數目。
  2. 使用更多佇列 (不要只用一個)。
    並不是所有的 task 執行時間、次數跟權重都一樣,例如不重要的 task A 可能會執行很多次,但比較重要的 task B 只有零星幾個。一個佇列會導致 Celery 依序執行佇列裡的工作,所以前面可能會排了很多 task A 工作,就多花了許多時間執行,反而重要的 task B 工作延後了。依照 task 分佇列擺放,可以避免這樣的情況。
  3. 使用權重。Celery 可以針對佇列設定權重與分流,可以使用專門的 worker 來服務指定的佇列,讓 task 得到更好的服務。
    
       CELERY_QUEUES = (
        Queue('default', Exchange('default'), routing_key='default'),
        Queue('for_task_A', Exchange('for_task_A'), routing_key='for_task_A'),
        Queue('for_task_B', Exchange('for_task_B'), routing_key='for_task_B'),
       )
       CELERY_ROUTES = {
        'my_taskA': {'queue': 'for_task_A', 'routing_key': 'for_task_A'},
        'my_taskB': {'queue': 'for_task_B', 'routing_key': 'for_task_B'},
       }
    
    
       celery worker -E -l INFO -n workerA -Q for_task_A
       celery worker -E -l INFO -n workerB -Q for_task_B
    
  4. 使用 Celery 的錯誤處理機制。task 可以指定這些參數 default_retry_delay=300, max_retries=5 來指定重試間隔與重試次數。task 裏面只要使用 try…except 跟 self.retry 就可以了。
    
       @app.task(bind=True, default_retry_delay=300, max_retries=5)
       def my_task_A():
         try:
           print("doing stuff here...")
         except SomeNetworkException as e:
           print("maybe do some clenup here....")
           self.retry(e)     # Retry!
    
  5. 使用 Flower。這是一個只要裝上就能使用的 Module,可以用來觀察 Task/Queue 的狀況。
  6. 只有在真的需要時,才保留執行結果。不需要的話,就加上 CELERY_IGNORE_RESULT = True,Celery 會自動丟棄結果。
  7. 不要傳遞 ORM 物件給 task。這是因為 Celery 是用 serialization 方式來傳遞參數到別的 Process (Task 是在其他的 Process 上執行),預設可以使用 pickle, cPickle, JSON, YAML ,但是 serialization/deserialization 是有負擔的,而且不保證所有狀態都能保存,建議最好是 pure 的物件或是用整數、字串等比較不容易出狀況的型態。

Celery 的 autodiscover_tasks

跟 Django 的整合可以參考 First steps with Django

裏面會要求你在 django app 的目錄下新增一個 celery.py,這裡有一行 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) ,這行就是找到所有 tasks 的關鍵。找不到 task 的話,執行 python manage.py celeryd 時,不會有錯誤訊息,只在程式要執行這些 task 時印出錯務訊息,說找不到。

原始碼是在 celery/app/base.py 裡,大致就是依照 INSTALLED_APPS 列出的 package 去找 tasks,有的話就 import 進來。如果你的 celery task 沒有列在 INSTALLED_APPS 裡,或是函式不在 tasks 裡的話,可以再多加 app.autodiscover_tasks,例如 app.autodiscover_tasks([‘your_module’], related_name=’my_tasks’) ,這樣就可以引入使用了。