Dostlar, kafayı yiyecektim resmen. Uzun süredir çalışan, milyonlarca mesajı sorunsuz işlemiş bir RabbitMQ consumer'ım vardı. Sonra bir bakıyorum, kuyruktaki mesaj sayısı prefetch_count'u geçmiş, sürekli artıyor. Consumer'ın işlemci ve bellek kullanımı normal, loglarda hiçbir hata yok. Ama mesajlar bir türlü bitmiyor. "Bu mesajları kim yiyor?" diye bakarken, meğerse consumer'ın kendisi yiyormuş da... sindiremiyormuş.
İlk önce network, sonra RabbitMQ sunucusu, derken kodda bir memory leak mi var diye saatlerce profil çektim. pika kütüphanesiyle yazılmıştı consumer. StackOverflow'da bile benzer bir şey bulamadım. En son, işlem sırasında bir log satırı ekledim: "Mesaj işlendi, ACK gönderiliyor...". Logda bu yazıyordu, ama kuyruk azalmıyordu.
Sonra fark ettim. Kod şöyleydi:
Python:
def on_message(channel, method, properties, body):
# Mesajı işle...
process_data(body)
# ACK gönder... (AHHH BU SATIR YORUM SATIRIYDI!)
# channel.basic_ack(delivery_tag=method.delivery_tag)
Şaka gibi ama, channel.basic_ack satırı yorum satırı olarak kalmış! Belki bir debug sırasında kapatmışımdır, commit ederken fark etmemişim. Consumer mesajı alıyor, işliyor, ama RabbitMQ'ya "Bu iş bitti" (basic_ack) sinyalini göndermiyor. RabbitMQ da "Bu consumer hala bu mesajla uğraşıyor, ona yeni mesaj göndermeyeyim" diyerek mesajı unacknowledged durumunda tutuyor. Consumer restart edince, RabbitMQ "Bak bu mesaj işlenmedi" deyip aynı mesajı tekrar gönderiyor. Sonsuz bir döngü!
Çözüm, tüm kuyruğu purge edip, o lanet yorum satırını kaldırmak oldu. Ama asıl ders, auto_ack=False ile çalışıyorsan, ACK göndermeyi ASLA ama ASLA try-except bloğunun dışına bırakmamak. Hata olsa da olmasa da ACK'yi garantilemek lazım. Şöyle bir pattern artı kafama kazındı:
Python:
def on_message(channel, method, properties, body):
try:
process_data(body)
channel.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
log_error(e)
# Belki negative acknowledgement (nack) ile mesajı reddet
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
Siz de böyle "ACK unutma" veya "ACK'yi yanlış yere koyma" gibi basit ama sistemi durduran bir hata yaptınız mı? Prefetch sayısını düşük tutmak böyle durumlarda hasarı sınırlar mı sizce?