-
Notifications
You must be signed in to change notification settings - Fork 26.6k
Unify the deframer of the client and server sides for the Triple protocol #16041
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 3.3
Are you sure you want to change the base?
Conversation
|
The pack and unpack logic still remain to be made compatible in the next PR. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## 3.3 #16041 +/- ##
=========================================
Coverage 60.74% 60.74%
+ Complexity 11754 11745 -9
=========================================
Files 1949 1948 -1
Lines 88898 88878 -20
Branches 13407 13404 -3
=========================================
- Hits 53998 53986 -12
Misses 29337 29337
+ Partials 5563 5555 -8
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
565c117 to
05f0b17
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request unifies the deframing logic for the Triple protocol by replacing the client-side specific TriDecoder with a shared GrpcStreamingDecoder that's already used on the server side. This consolidation eliminates code duplication and improves maintainability.
Changes:
- Unified deframing by replacing TriDecoder with GrpcStreamingDecoder for both client and server sides
- Changed Stream.Listener and StreamingDecoder.FragmentListener interfaces to accept InputStream instead of byte[] for better streaming support
- Added BoundedInputStream wrapper to provide mark/reset support required by deserializers
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/frame/TriDecoder.java | Removed client-specific decoder in favor of unified GrpcStreamingDecoder |
| dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/frame/Deframer.java | Removed interface as StreamingDecoder is now used |
| dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/Stream.java | Updated Listener interface to accept InputStream with messageLength parameter |
| dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java | Switched to GrpcStreamingDecoder and ByteBufInputStream for data handling |
| dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcStreamingDecoder.java | Updated to return MessageStream with InputStream for better streaming support |
| dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java | Added BoundedInputStream to provide mark/reset support and proper message boundary handling |
| dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/StreamingDecoder.java | Updated FragmentListener interface to include messageLength parameter |
| dubbo-common/src/main/java/org/apache/dubbo/rpc/model/PackableMethod.java | Added InputStream-based parseResponse method with default byte[] delegation |
| dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java | Added mark/reset calls to support reading stream twice for method descriptor determination |
| dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/CompositeInputStream.java | Improved error handling in releaseHeadStream with null check and exception handling |
| dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/frame/TriDecoderTest.java | Removed test for deleted TriDecoder class |
| dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/MockClientStreamListener.java | Updated to handle InputStream-based messages |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @Override | ||
| protected byte[] readRawMessage(InputStream inputStream, int length) throws IOException { | ||
| byte[] rawMessage = super.readRawMessage(inputStream, length); | ||
| return compressedFlag ? deCompressedMessage(rawMessage) : rawMessage; | ||
| } | ||
|
|
||
| private byte[] deCompressedMessage(byte[] rawMessage) { | ||
| return deCompressor.decompress(rawMessage); | ||
| protected MessageStream readMessageStream(InputStream inputStream, int length) throws IOException { | ||
| if (compressedFlag) { | ||
| // For compressed messages, we need to read bytes first, then decompress | ||
| byte[] rawMessage = readRawMessage(inputStream, length); | ||
| byte[] decompressed = deCompressor.decompress(rawMessage); | ||
| return new MessageStream(new java.io.ByteArrayInputStream(decompressed), decompressed.length); | ||
| } | ||
| return super.readMessageStream(inputStream, length); | ||
| } | ||
| } |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The refactoring to use GrpcStreamingDecoder (unified deframer) removes the Triple-specific TriDecoder tests without adding equivalent test coverage. While GrpcStreamingDecoder is tested elsewhere, consider adding tests that verify the Triple protocol's specific usage patterns, including: compressed/uncompressed message handling, proper flow control through bytesRead callbacks, and correct integration with the DeCompressor component.
| public static MethodDescriptor findTripleMethodDescriptor( | ||
| ServiceDescriptor serviceDescriptor, String methodName, InputStream rawMessage) throws IOException { | ||
| MethodDescriptor methodDescriptor = findReflectionMethodDescriptor(serviceDescriptor, methodName); | ||
| if (methodDescriptor == null) { |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to rawMessage.mark(Integer.MAX_VALUE) assumes the InputStream supports mark/reset and has a sufficient buffer. While BoundedInputStream (used in the unified deframer) extends BufferedInputStream and supports this, consider adding a check or documentation to ensure all callers pass InputStreams that support mark/reset with adequate buffer size. If an InputStream that doesn't support mark/reset is passed, this will fail silently and the subsequent reset() on line 144 will throw an IOException.
| if (methodDescriptor == null) { | |
| if (methodDescriptor == null) { | |
| if (!rawMessage.markSupported()) { | |
| throw new IOException("InputStream does not support mark/reset, which is required to resolve overloaded triple methods."); | |
| } |
...riple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
Show resolved
Hide resolved
...tp12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
Outdated
Show resolved
Hide resolved
...tp12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
Outdated
Show resolved
Hide resolved
...tp12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
Outdated
Show resolved
Hide resolved
ede41a6 to
52cabcf
Compare
zrlw
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This commit unifies the deframer implementation between client and server sides for the Triple protocol, eliminating code duplication and improving maintainability. ## Key Changes ### Deleted Files - TriDecoder.java: Removed client-specific deframer implementation - Deframer.java: Removed client deframer interface - RecordListener.java: Removed test helper - TriDecoderTest.java: Removed corresponding test ### Core Modifications 1. **LengthFieldStreamingDecoder**: Added BoundedInputStream to support: - Message boundary isolation (prevents reading beyond current message) - mark/reset support required by Hessian2 and other deserializers - Zero-copy optimization by reading directly from accumulate stream 2. **StreamingDecoder.FragmentListener**: Added messageLength parameter to onFragmentMessage() method for better flow control 3. **AbstractTripleClientStream**: Migrated from TriDecoder to GrpcStreamingDecoder, using ByteBufInputStream to adapt Netty's ByteBuf to InputStream 4. **Stream.Listener.onMessage**: Changed parameter from byte[] to InputStream for unified handling and memory optimization 5. **PackableMethod**: Added default parseResponse(InputStream) method for backward compatibility 6. **CompositeInputStream**: Improved stream release logic to prevent exceptions 7. **DescriptorUtils**: Added mark() call before reading stream to support reset ## Architecture Change Before: - Client: ByteBuf -> TriDecoder (Netty-specific) -> byte[] -> deserializer - Server: InputStream -> LengthFieldStreamingDecoder -> byte[] -> deserializer After: - Client: ByteBuf -> ByteBufInputStream -> GrpcStreamingDecoder -> BoundedInputStream -> deserializer - Server: InputStream -> GrpcStreamingDecoder -> BoundedInputStream -> deserializer ## Bug Fix Fixed BoundedInputStream.reset() not restoring the 'remaining' counter, which caused streams to return EOF after mark/reset. This was the root cause of POJO method invocation failures with "Unexpected serialization type:null" error.
3128bd2 to
39f647d
Compare
ccf7d0d to
9f2f062
Compare
zrlw
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What is the purpose of the change?
Checklist