Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New RabbitMQ Streams Feature #281

Open
hanzo2001 opened this issue Mar 30, 2022 · 1 comment
Open

New RabbitMQ Streams Feature #281

hanzo2001 opened this issue Mar 30, 2022 · 1 comment
Labels
enhancement New feature or request

Comments

@hanzo2001
Copy link

As of RabbitMQ 3.9 there is a new feature called Streams.

I was testing a small setup that I have with Apache Camel and RabbitMQ Streams until I found a configuration that works. I then chose to swap the ConnectionFactory with the Mock version. Almost everything works as expected but I will leave the few details that this mocking tool might want to add.

Regarding declaration, there is a new queue type: stream (the mock still works but it will become important for the behavior under testing). There are many other details but those are policy driven. I don't know if it these details would be interesting for this mock tool. Binding works exactly the same as any classic queue and the exchanges.

As far as the declaration goes, Streams

  • must have autoAck set to false,
  • must store all it's messages in an addressable fashion
    • by index (for offsets)
    • by timestamp (for dates)

Regarding the message persistence, Streams are hardcoded to persist everything and consumers cannot cause messages to disappear from the Stream. RabbitMQ does have a mechanism to shorten the Stream itself by way of policies (max-age, etc). Again, I don't know if these features are supposed to come in handy for a mock tool.

The queued messages must be accessible by a few methods

  • "first" starts at the first available message;
  • "last" starts at last written chunk (chunking is the storage segmentation mechanism, which would have to be incorporated for this feature to make sense, which I don't know if it is overkill for this tool);
  • "next" starts at the end and will consume the first new offset written to the stream, this is the default if the offset is not declared;
  • numerical offset (long) is self explanatory (0 based indexing);
  • timestamp (java.util.Date) will start consuming from the closest message timestamp;
  • an interval string (e.g.: "1D 20h 15m 22s") will start consuming from the date resulting in the subtraction of interval from the current date.

Regarding consumers, they will have to define some form of Stream offset to start reading from (the option x-stream-offset or "next" if undeclared) which can be populated by the different kinds of values described above.

In addition, Stream consumers

  • must have qos (a prefetch size) defined on the channel,
  • must manually acknowledge the delivery,
  • must somehow store the offset in a durable medium to survive a reboot (this feature is supposedly handled by RabbitMQ but the AMQP0.9.1 spec and clients do not make use of this),
  • must be provided the current offset of the message in the message.properties.headers["x-stream-offset"] so that they may store the offset to pick up after a reboot.

The RabbitMQ team is already working on a RabbitMQ Java Stream client library but the amqp-0.9.1 client is supposed to be able to provide the low level features so implementers can choose their own method.

In summary, a healthy minimum set of features could be

  • a new queue type: stream
    • Streams keep all messages stored
    • Streams keep a numerical index
    • Streams keep a timestamp index
  • stream consumers
    • must not autoAck (consumer should fail to set up),
    • must define qos (prefetch) (consumer should fail to set up),
    • should provide the option "x-stream-offset" ("first", "next", offset, timestamp)
    • must be provided the current message offset in the headers under "x-stream-offset" of each message.

More Info at the source

@hanzo2001
Copy link
Author

Fork with branch

As a follow up, I forked the project and I have a branch that has started the implementation of this new feature. It is a very naive implementation that reuses a lot of the existing code. My intention is that the feature behaves like the tests should describe (in code) but I am missing quite a few.

  • stream acts as a fanout for all consumers
  • all consumers of a stream can start reading at any offset till the end

The branch is here: https://github.com/hanzo2001/rabbitmq-mock/tree/feature/naive-streams-implementation

Do I need to PR for a checkup? (my stuff is not PR ready yet)

TODO

I have not tested the expected errors (no qos, etc). I have no clue as to what is supposed to happen with other Channel calls like basicNack, basicReject, etc in the context of Streams. I only understand them as read and continue (only Ack and continue to the next). Streams do not have a DLQ concept of undeliverable, the messages are not consumed and will exist forever or until the retention policy allows it. In addition, I think there is no concept of requeue.

@ledoyen ledoyen added the enhancement New feature or request label Apr 1, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants