Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable support for MQTT stitcher in stirling #1918

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

ChinmayaSharma-hue
Copy link
Contributor

Summary: This PR adds the stitcher component of MQTT (v5), a newly added protocol.

Related issues: #341

Type of change: /kind feature

Test Plan: Added tests

@ChinmayaSharma-hue ChinmayaSharma-hue requested a review from a team as a code owner May 26, 2024 13:20
Signed-off-by: Chinmay <[email protected]>
Copy link
Member

@ddelnano ddelnano left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChinmayaSharma-hue sorry for the late review and really appreciate all your work on the MQTT protocol support!

Comment on lines 92 to 94
uint8_t request_match_key =
(req_frame.control_packet_type << 4) | static_cast<uint8_t>(req_frame.dup) << 3 |
(req_frame.header_fields["qos"]) << 1 | static_cast<uint8_t>(req_frame.retain);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be best to encapsulate this request key creation into a function. The logic is implemented a few times and the MapRequestToResponse itself is hard to understand without reading the stitching code that uses it.

One thought I had for accomplishing this would be to have a custom data type for the map key and having a conversion function to go from mqtt::Message to the map key type. This would avoid duplciating the logic and make it more obvious how the map is used.

This is a somewhat similar example here for this data type. I don't think we need to override the hash function to accomplish what I described, but thought this might be useful to share.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean something like this, right?

MatchKey getMatchKey(mqtt::Message frame) {
    return (frame.control_packet_type << 4) | static_cast<uint8_t>(frame.dup) << 3 |
    (frame.header_fields["qos"] & 0x3) << 1 | static_cast<uint8_t>(frame.retain);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that's what I had in mind. We should also avoid copying the mqtt::Message when this function is used as well.

Comment on lines 126 to 128
return ((message.control_packet_type << 4) | (static_cast<uint8_t>(message.dup) << 3) |
(message.header_fields["qos"] << 1) | static_cast<uint8_t>(message.retain)) ==
response_match_value;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be more readable with a comparison function like I described above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be changed to this according to the function for this logic,

auto response_frame_iter = std::find_if(
          first_timestamp_iter, resp_deque.end(), [response_match_value](mqtt::Message& message) {
            return getMatchKey(message) == response_match_value;
          });

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me 👍

for (mqtt::Message& req_frame : req_deque) {
// getting the appropriate response match value for the request match key
uint8_t request_match_key =
(req_frame.control_packet_type << 4) | static_cast<uint8_t>(req_frame.dup) << 3 |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I reviewed this to see if there was potential for overflow or for any fields to encroach on another field's bits, I was surprised that control_packet_type was defined as a uint8_t type. After checking the spec again, it does appear that this entire value should fit within 8 bits without having overlap (assuming the uint32_t from header_fields truly models a 2 bit value for qos).

Can you add a comment here to mention that this is modeling a 4 bit field?

I also think we should cap the req_frame.header_fields["qos"] to 2 bits just to be safe. In order to accomplish that I was thinking of the following:

(req_frame.header_fields["qos"] & 0x3) << 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here to mention that this is modeling a 4 bit field?

I also think we should cap the req_frame.header_fields["qos"] to 2 bits just to be safe.

Sure, will do.
Should I also add a comment here in types.h where control_packet_type is defined as uint8_t that it is modelling a 4 bit field?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for calling that out. Yep, that would be great to have as well.

EXPECT_EQ(result.error_count, 0);
EXPECT_EQ(result.records.size(), 4);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are some additional edge cases we will want to think about:

  1. Duplicate frame during a QoS 1 PUBLISH
  2. Case were client and server are using the same packet identifiers -- this is possible according to the spec and is different from how other protocols that use the StitchFrames map interface work (ID generation is usually client side only).

Screen Shot 2024-06-17 at 8 29 39 AM

Case 2 is interesting because I believe the caller of StitchFrames will provide the deque "flip flopped" and cause failed stitching -- meaning StitchFrames will contain a PUBLISH frame in one of the response deques if the server initiates the publish.

Have you considered how to handle Case 1? As for Case 2, I believe we will need a reverse mapping in MapRequestToResponse. It would be helpful to see if my hypothesis about the flip flopping of deques happens. We can leave a TODO here for now and wait to address that once we have the system working end to end so we can verify it with real traffic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate frame during a QoS 1 PUBLISH

I am assuming you mean a duplicate QOS 1 PUBLISH from the client itself, with a single PUBACK from the server (caused due to retransmission of PUBLISH after not receiving PUBACK). This pointed to a bug in the stitcher code where I was not checking if the response had been consumed in the response deque in the code where I was searching for a response with the matching response key. This was causing a single response to be matched to multiple duplicate requests. I have added this, and now the code works like this in case of duplicate QOS 1 PUBLISH,

  1. If there are two duplicate PUBLISH frames (QOS 1) in the request deque, and one PUBACK frame in the response deque, then the first PUBLISH frame is stitched with the PUBACK frame, and the second PUBLISH frame is left in the request deque (with error count as 0). The code works like this because I did not want to clear certain request frames in the end of the deque, as the response could be filled the next time StitchFrames is called and the response deque now contains the response to this request. We know however that no response will come for this duplicate PUBLISH request, but I see no problem with this as this request will be cleared in the next round when more request-response stitching occurs in the deque.
  2. If there are two duplicate QOS 1 PUBLISH frames, one SUBSCRIBE frame (can be any other request frame) in the request deque, and one PUBACK frame, one SUBACK frame in the response deque, then the first PUBLISH will be stitched with PUBACK, the SUBSCRIBE will be stitched with SUBACK, and this time the error count will be 1, and the duplicate PUBLISH will be cleared from the request deque.

Case were client and server are using the same packet identifiers -- this is possible according to the spec and is different from how other protocols that use the StitchFrames map interface work (ID generation is usually client side only).

This would work fine if the flip-flopping as you mentioned does not occur, but if it does, then I will have to add logic to do reverse stitching from response to request as well, as you said. I will add a TODO comment for reference in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For case 1, how does the implementation discern between an unanswered PUBLISH frame (QOS 1) compared to a PUBLISH sequence (QOS1) where there are duplicated PUBLISHes and a PUBACK? It seems that the next round of stitching might have difficulty telling the two cases apart. Imo the former should be classified as an error while the latter seems fine to consider as a successful case.

Sounds good on the TODO and revisiting this for case 2.

Copy link
Contributor Author

@ChinmayaSharma-hue ChinmayaSharma-hue Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does the implementation discern between an unanswered PUBLISH frame (QOS 1) compared to a PUBLISH sequence (QOS1) where there are duplicated PUBLISHes and a PUBACK?

Actually the implementation would consider one of the duplicated PUBLISH frames (QOS 1) (or multiple, if there are more than one duplicate frames due to retransmission) as unanswered PUBLISH frames. In the next round of stitching, the unanswered PUBLISH frames would be discarded, when newer frames which have newer responses are stitched.
That logic is present in this code,

while (iter != req_deque.end() && (iter->timestamp_ns < latest_resp_ts)) {
  if (iter->consumed) {
    ++erase_until_iter;
  }
  if (!iter->consumed && !(iter == req_deque.end() - 1) && ((erase_until_iter + 1)->consumed)) {
    ++error_count;
    ++erase_until_iter;
  }
  ++iter;
}
req_deque.erase(req_deque.begin(), erase_until_iter);
}

where all the requests that have not been consumed until the latest response timestamp would be removed. I thought this would make sense, since if there are responses to subsequent requests after the unanswered PUBLISH, then it will go unanswered and I need to remove it from the request deque.

But yeah, it will give error count as 1 in cases where there are duplicate PUBLISH frames with a PUBACK. I thought this was fine since it is a case of an unanswered PUBLISH itself that prompted retransmission and duplication, so error count being the number of duplicate PUBLISH frames minus one (for the answered PUBLISH) would make sense.

Should I modify the logic so that the error count is zero in cases where the PUBLISH frames finally has an answer in PUBACK, and error count is non zero in cases where there are no PUBACKs at all?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I modify the logic so that the error count is zero in cases where the PUBLISH frames finally has an answer in PUBACK, and error count is non zero in cases where there are no PUBACKs at all?

Yes, I believe that's the correct thing to do here. Stirling will disable tracing a connection if the stitcher failure rate goes over a certain threshold, so accurately labeling these cases will avoid difficult to debug situations later on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking, the way I could do this is marking the duplicate messages that have been answered as consumed.

Also, the stitching logic would match the PUBLISH with dup bit set to 0 with the PUBACK, and the PUBLISH with dup bit set to 1 would be dropped. Is this fine? Should I replace the PUBLISH in the entries vector once a duplicate has been found that has already been stitched with a PUBACK with the first PUBLISH frame?


Message connect_frame, pingreq_frame;
frame_view = CreateStringView<char>(CharArrayStringView<uint8_t>(kConnect));
ParseFrame(message_type_t::kRequest, &frame_view, &connect_frame);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The majority of the other stitcher implementations don't rely on parsing frames from bytes (using the ParseFrame function). They usually have a helper CreateFrames function. I think that simplifies things since many of the payload details aren't necessary for stitching -- mqtt version, message length, property length, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have done this as well. I have made a CreateFrames function now that takes in type (Request/Response), control_packet_type, packet_identifier and qos as arguments, and replaced all the code where I was parsing frames.

#include <utility>

#include "src/common/base/base.h"
#include "src/common/json/json.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This header file seems unused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

request_match_key);
continue;
}
if (iter->second == 0xff) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a variable for this 0xff value at the top level scope would make this easier to understand:

constexpr uint8_t UnmatchedResp = 0xff;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines +159 to +167
// iterate through all response dequeues to find out which ones haven't been consumed
for (auto& [packet_id, resp_deque] : *resp_frames) {
for (auto& resp : resp_deque) {
if (!resp.consumed) {
error_count++;
}
}
resp_deque.clear();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For other protocols, we assume that a request will always be seen before a response. This allows us the ability to clear the response deque at the end of each stitching round. I'm not sure that assumption will hold for MQTT since the server and client can independently initiate PUBLISH requests (as I mentioned in another comment).

Can we add a comment here to make sure we test how server "client" frames work with this logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a follow up for the TODO comment for the case where PUBLISH ends up in the response deque, right? Another TODO comment to make sure we test the clearing of the response deque when such a thing occurs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a follow up for the TODO comment for the case where PUBLISH ends up in the response deque, right?

Correct. If my assumption is correct, then I think we will have a bug when the server initiates a PUBLISH -- the heuristic that it's correct to clear the response deque at the end of each stitching round will cause requests to be dropped.

Here is an example of the wording I was thinking of. Let me know if that clarifies things and makes sense:

Verify which deque server side PUBLISH frames are inserted into. It's suspected that these PUBLISH requests will end up in the resp deque and will cause the resp deque cleanup logic to erroneously drop request frames

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that comment makes things clear.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants