Skip to content

[WIP] added initial redis cluster transport support code#1021

Closed
auvipy wants to merge 3 commits intocelery:masterfrom
auvipy:redcls
Closed

[WIP] added initial redis cluster transport support code#1021
auvipy wants to merge 3 commits intocelery:masterfrom
auvipy:redcls

Conversation

@auvipy
Copy link
Member

@auvipy auvipy commented Mar 11, 2019

working on new tests

Copy link
Contributor

@georgepsarakis georgepsarakis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that this is still work in progress, but I see several similarities and common classes with the Redis transport.

For example:

So apart from connection / connection pool related classes, and the differences in URL connection parameters in order to support Redis Cluster, I think it would be nice, if we could import all common functionality from the Redis transport and possibly override whenever necessary.


return hosts, password, path

def _connparams(self, _r210_options=(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tuple should probably be moved to a constant?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will check

@iwinux
Copy link

iwinux commented Mar 20, 2019

I've got a weird error when using SimpleQueue.get() with this new transport:

In [8]: m = q.get(timeout=1)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-8-662154a51631> in <module>
----> 1 m = q.get(timeout=1)

/usr/local/lib/python3.7/dist-packages/kombu/simple.py in get(self, block, timeout)
     59                 #    messages are sent over the same socket; also POSIX makes
     60                 #    no guarantees against socket calls returning early.
---> 61                 self.channel.connection.client.drain_events(timeout=remaining)
     62             except socket.timeout:
     63                 raise self.Empty()

/usr/local/lib/python3.7/dist-packages/kombu/connection.py in drain_events(self, **kwargs)
    313             socket.timeout: if the timeout is exceeded.
    314         """
--> 315         return self.transport.drain_events(self.connection, **kwargs)
    316
    317     def maybe_close_channel(self, channel):

/usr/local/lib/python3.7/dist-packages/kombu/transport/virtual/base.py in drain_events(self, connection, timeout)
    961         while 1:
    962             try:
--> 963                 get(self._deliver, timeout=timeout)
    964             except Empty:
    965                 if timeout is not None and monotonic() - time_start >= timeout:

TypeError: get() got multiple values for argument 'timeout'

It works fine when using kombu.transport.redis.

@iwinux
Copy link

iwinux commented Mar 20, 2019

Seems that MultiChannelPoller.get should have signature get(self, callback, timeout=None) instead of get(self, timeout=None)?

@auvipy
Copy link
Member Author

auvipy commented Mar 20, 2019

thanks for testing and feedback. I will check and try to push a fix ASAP. keep in mind its work in progress right now

@iwinux
Copy link

iwinux commented Mar 22, 2019

I managed to trim down the code, inherit most stuff from kombu.transport.redis and fixed some corner cases (see #1025), resulting in a 250-line module. Some hardcoded things (e.g.: pipeline(True) in QoS) forced code duplication, though.

What works so far:

  1. celery worker seems to work well on Redis Cluster (3 masters + 3 replica) with multi producers + multi consumers.

  2. all celery inspect commands are broken for now because the PUBSUB part is not compatible (yet).

  3. there're some memory leak issues related to heartbeat (might be PUBSUB's fault too) (see Memory leak from writing in a loop to a broken rabbitmq heartbeat connection celery#5047).

Thank you for your PR very much. Otherwise I won't even know where to get started. 👍

@auvipy
Copy link
Member Author

auvipy commented Mar 23, 2019

@iwinux thanks for trying this. If you don't mind could you push your improvements in my branch? as we both are working with it, why should duplicate the effort? lets polish it together? does that sound ok? we will get the credit for this.

@iwinux
Copy link

iwinux commented Mar 24, 2019

That sounds great. I'll send you a pull request :)

@auvipy
Copy link
Member Author

auvipy commented Mar 24, 2019

amazing! thanks!

@iwinux
Copy link

iwinux commented Mar 25, 2019

The pull request to this pull request is here: https://github.com/auvipy/kombu/pull/2

I've tried to simplify it further but got stuck here. I'm not sure how to implement a new transport properly - useful information is buried deep in existing code, obscured by event polling / callbacks, backend specifics and error handling etc.

Also, would it be a good idea to refactor kombu.transport.redis so that common code could be shared more cleanly, compared to current hacky method overrides?

@yakhira
Copy link

yakhira commented Jul 16, 2019

guys, i hope you didn't give up with that change :)

@auvipy auvipy self-assigned this Jul 16, 2019
@auvipy
Copy link
Member Author

auvipy commented Jul 16, 2019

this is on list, plz donate for celery

@aboganas
Copy link

is this still active?

@auvipy
Copy link
Member Author

auvipy commented Feb 12, 2020

is this still active?

can you test this on local machine?

@youguanxinqing
Copy link

youguanxinqing commented Aug 23, 2020

is this still active?

can you test this on local machine?

I want to test it on my machine, but I find it didn't work: package path error、class name error and so on. After that I tried to fix these simple errors, it occurs that "ERROR sending 'cluster slots' command to redis server" . So, is it still active?

@NullYing
Copy link

is this still active?

@auvipy
Copy link
Member Author

auvipy commented Dec 18, 2020

I removed my personal fork so this need copy-paste if someone wants to try & contribute

@ukm21
Copy link

ukm21 commented May 18, 2021

Has anyone fixed this issue

File "/usr/local/lib/python3.6/site-packages/celery/worker/worker.py", line 205, in start
self.blueprint.start(self)
File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 369, in start
return self.obj.start()
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 318, in start
blueprint.start(self)
File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 596, in start
c.loop(*c.loop_args())
File "/usr/local/lib/python3.6/site-packages/celery/worker/loops.py", line 113, in synloop
connection.drain_events(timeout=2.0)
File "/usr/local/lib/python3.6/site-packages/kombu/connection.py", line 323, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File "/usr/local/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 963, in drain_events
get(self._deliver, timeout=timeout)
TypeError: get() got multiple values for argument 'timeout'

@dligthart
Copy link

is this still active ? would be really nice to have.

@jeffreybrowning
Copy link

jeffreybrowning commented Jul 9, 2021

@dligthart In my experience, the celery team is stretched pretty thin and @auvipy here is suggesting someone jump in and contribute to the project if they want this done.

Also, contributors seem to be more hesitant to get into the redis backend, as it needs more work.

@auvipy
Copy link
Member Author

auvipy commented Jul 9, 2021

i might revisit it in near future If no one take this over

@aboganas
Copy link

aboganas commented Jul 9, 2021

Is there a way to financially contribute to such a feature?

@auvipy
Copy link
Member Author

auvipy commented Jul 10, 2021

Is there a way to financially contribute to such a feature?

you can reach me out at: auvipy@gmail.com

@auvipy auvipy modified the milestones: 5.1.0, 6.0 Sep 20, 2021
@pabclsn
Copy link

pabclsn commented Nov 8, 2022

any news on this one ;) ?

@benedikt-bartscher
Copy link

i would love to see this implemented, thanks for your work so far :)

@haobibo
Copy link

haobibo commented Dec 20, 2022

@auvipy I see this FR added milestone 5.1.0 and 6.0. Is it implemented somewhere else?
This will be a really useful feature for many use cases.

@auvipy
Copy link
Member Author

auvipy commented Dec 21, 2022

@auvipy I see this FR added milestone 5.1.0 and 6.0. Is it implemented somewhere else? This will be a really useful feature for many use cases.

this will be moved to somewhere else for now

@brunotacca
Copy link

Is this in progress or being tracked somewhere else in this repo?

Thanks

@zs-neo
Copy link

zs-neo commented Oct 22, 2024

@auvipy Is the work still ongoing? Celery is very important to our company, and we have found that it does not support Redis cluster. We want to support it on Kombu, does this issue still accept PR?

@auvipy
Copy link
Member Author

auvipy commented Oct 22, 2024

Yes it does

@LeoNumber1
Copy link

@auvipy Is the work still ongoing? Celery is very important to our company, and we have found that it does not support Redis cluster. We want to support it on Kombu, does this issue still accept PR?

@zs-neo hello, my project just need Celery work in redis cluster. Can we have a conversation?Thanks。
Email:liyuan.go@bytedance.com

@zzJinux
Copy link

zzJinux commented Dec 7, 2024

min_priority = 0
max_priority = 0
priority_steps = [min_priority]

Is priority queue not supported for redis cluster transport?

@Ragnarow
Copy link

Ragnarow commented Feb 4, 2025

Hello, can the MR be merged for next release?

@auvipy
Copy link
Member Author

auvipy commented Feb 5, 2025

that PR is scheduled for v5.6

@jnu
Copy link

jnu commented Feb 14, 2025

that PR is scheduled for v5.6

@auvipy do you mean this redis-cluster transport is slated for v5.6? or is it something you still need help with? or both?

nevermind, I'm following along now in the other PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.