diff --git a/java/shared/java/org/signal/libsignal/internal/Native.java b/java/shared/java/org/signal/libsignal/internal/Native.java index 51461f90a..2fc9fffa0 100644 --- a/java/shared/java/org/signal/libsignal/internal/Native.java +++ b/java/shared/java/org/signal/libsignal/internal/Native.java @@ -626,6 +626,7 @@ public final class Native { public static native void TESTING_ChatServiceErrorConvert(String errorDescription) throws Exception; public static native Object TESTING_ChatServiceResponseAndDebugInfoConvert() throws Exception; public static native Object TESTING_ChatServiceResponseConvert(boolean bodyPresent) throws Exception; + public static native void TESTING_ChatService_InjectConnectionInterrupted(long chat); public static native void TESTING_ChatService_InjectRawServerRequest(long chat, byte[] bytes); public static native void TESTING_ErrorOnBorrowAsync(Object input); public static native CompletableFuture TESTING_ErrorOnBorrowIo(long asyncRuntime, Object input); diff --git a/node/Native.d.ts b/node/Native.d.ts index 16ec903d7..016975977 100644 --- a/node/Native.d.ts +++ b/node/Native.d.ts @@ -121,6 +121,7 @@ export abstract class ChatListener { ack: ServerMessageAck ): void; _queue_empty(): void; + _connection_interrupted(): void; } export abstract class MakeChatListener extends ChatListener {} @@ -492,6 +493,7 @@ export function TESTING_ChatServiceDebugInfoConvert(): ChatServiceDebugInfo; export function TESTING_ChatServiceErrorConvert(errorDescription: string): void; export function TESTING_ChatServiceResponseAndDebugInfoConvert(): ResponseAndDebugInfo; export function TESTING_ChatServiceResponseConvert(bodyPresent: boolean): ChatResponse; +export function TESTING_ChatService_InjectConnectionInterrupted(chat: Wrapper): void; export function TESTING_ChatService_InjectRawServerRequest(chat: Wrapper, bytes: Buffer): void; export function TESTING_ErrorOnBorrowAsync(_input: null): Promise; export function TESTING_ErrorOnBorrowIo(asyncRuntime: Wrapper, _input: null): Promise; diff --git a/node/ts/net.ts b/node/ts/net.ts index 9cc8368fe..84f372ead 100644 --- a/node/ts/net.ts +++ b/node/ts/net.ts @@ -140,6 +140,17 @@ export interface ChatServiceListener { * were in the queue *when the connection was established* have been delivered. */ onQueueEmpty(): void; + + /** + * Called when the client gets disconnected from the server. + * + * This includes both deliberate disconnects as well as unexpected socket closures that will be + * automatically retried. + * + * Will not be called if no other requests have been invoked for this connection attempt. That is, + * you should never see this as the first callback, nor two of these callbacks in a row. + */ + onConnectionInterrupted(): void; } /** @@ -186,6 +197,9 @@ export class ChatService { _queue_empty(): void { listener.onQueueEmpty(); }, + _connection_interrupted(): void { + listener.onConnectionInterrupted(); + }, }; Native.ChatServer_SetListener( asyncContext, diff --git a/node/ts/test/NetTest.ts b/node/ts/test/NetTest.ts index dce8739ba..54b19fc97 100644 --- a/node/ts/test/NetTest.ts +++ b/node/ts/test/NetTest.ts @@ -14,6 +14,7 @@ import { ErrorCode, LibSignalErrorBase } from '../Errors'; import { ChatServerMessageAck, ChatService, + ChatServiceListener, Environment, Net, ServiceAuth, @@ -229,6 +230,7 @@ describe('chat service api', () => { const listener = { onIncomingMessage: sinon.stub(), onQueueEmpty: sinon.stub(), + onConnectionInterrupted: sinon.stub(), }; chat.setListener(listener); @@ -279,6 +281,7 @@ describe('chat service api', () => { '_incoming_message', '_queue_empty', '_incoming_message', + '_connection_interrupted', ]; const recordCall = function (name: string) { callsReceived.push(name); @@ -286,7 +289,7 @@ describe('chat service api', () => { completable.complete(); } }; - const listener = { + const listener: ChatServiceListener = { onIncomingMessage( _envelope: Buffer, _timestamp: number, @@ -297,6 +300,9 @@ describe('chat service api', () => { onQueueEmpty(): void { recordCall('_queue_empty'); }, + onConnectionInterrupted(): void { + recordCall('_connection_interrupted'); + }, }; chat.setListener(listener); callsToMake.forEach((message) => @@ -305,6 +311,7 @@ describe('chat service api', () => { message ) ); + Native.TESTING_ChatService_InjectConnectionInterrupted(chat.chatService); await completable.done(); expect(callsReceived).to.eql(callsExpected); }); @@ -315,10 +322,12 @@ describe('chat service api', () => { const listener1 = { onIncomingMessage: sinon.stub(), onQueueEmpty: sinon.stub(), + onConnectionInterrupted: sinon.stub(), }; const listener2 = { onIncomingMessage: sinon.stub(), onQueueEmpty: sinon.stub(), + onConnectionInterrupted: sinon.stub(), }; async function check( @@ -359,6 +368,7 @@ describe('chat service api', () => { const listener = { onIncomingMessage: sinon.stub(), onQueueEmpty: sinon.stub(), + onConnectionInterrupted: sinon.stub(), }; Native.TESTING_ChatService_InjectRawServerRequest( chat.chatService, diff --git a/rust/bridge/node/bin/Native.d.ts.in b/rust/bridge/node/bin/Native.d.ts.in index 36e9b3a73..710fa65f3 100644 --- a/rust/bridge/node/bin/Native.d.ts.in +++ b/rust/bridge/node/bin/Native.d.ts.in @@ -121,6 +121,7 @@ export abstract class ChatListener { ack: ServerMessageAck ): void; _queue_empty(): void; + _connection_interrupted(): void; } export abstract class MakeChatListener extends ChatListener {} diff --git a/rust/bridge/shared/src/ffi/chat.rs b/rust/bridge/shared/src/ffi/chat.rs index af5b4ed75..ed5610624 100644 --- a/rust/bridge/shared/src/ffi/chat.rs +++ b/rust/bridge/shared/src/ffi/chat.rs @@ -16,6 +16,7 @@ type ReceivedIncomingMessage = extern "C" fn( cleanup: *mut ServerMessageAck, ); type ReceivedQueueEmpty = extern "C" fn(ctx: *mut c_void); +type ConnectionInterrupted = extern "C" fn(ctx: *mut c_void); type DestroyChatListener = extern "C" fn(ctx: *mut c_void); /// Callbacks for [`ChatListener`]. @@ -29,6 +30,7 @@ pub struct FfiChatListenerStruct { ctx: *mut c_void, received_incoming_message: ReceivedIncomingMessage, received_queue_empty: ReceivedQueueEmpty, + connection_interrupted: ConnectionInterrupted, destroy: DestroyChatListener, } @@ -73,4 +75,8 @@ impl ChatListener for ChatListenerStruct { fn received_queue_empty(&mut self) { (self.0.received_queue_empty)(self.0.ctx) } + + fn connection_interrupted(&mut self) { + (self.0.connection_interrupted)(self.0.ctx) + } } diff --git a/rust/bridge/shared/src/net/chat.rs b/rust/bridge/shared/src/net/chat.rs index 1db0d4c7b..2e641b666 100644 --- a/rust/bridge/shared/src/net/chat.rs +++ b/rust/bridge/shared/src/net/chat.rs @@ -286,6 +286,7 @@ pub trait ChatListener: Send { ack: ServerMessageAck, ); fn received_queue_empty(&mut self); + fn connection_interrupted(&mut self); } impl dyn ChatListener { @@ -304,7 +305,7 @@ impl dyn ChatListener { ServerMessageAck::new(send_ack), ), chat::server_requests::ServerMessage::QueueEmpty => self.received_queue_empty(), - chat::server_requests::ServerMessage::Stopped => todo!(), + chat::server_requests::ServerMessage::Stopped => self.connection_interrupted(), } } @@ -442,6 +443,14 @@ fn TESTING_ChatService_InjectRawServerRequest(chat: &Chat, bytes: &[u8]) { .expect("not closed"); } +#[cfg(feature = "testing-fns")] +#[bridge_fn] +fn TESTING_ChatService_InjectConnectionInterrupted(chat: &Chat) { + chat.synthetic_request_tx + .blocking_send(chat::ws::ServerEvent::Stopped) + .expect("not closed"); +} + /// Wraps a named type and a single-use guard around [`chat::server_requests::AckEnvelopeFuture`]. pub struct ServerMessageAck { inner: AtomicTake, diff --git a/rust/bridge/shared/src/node/chat.rs b/rust/bridge/shared/src/node/chat.rs index 0708fb017..622b4b7d4 100644 --- a/rust/bridge/shared/src/node/chat.rs +++ b/rust/bridge/shared/src/node/chat.rs @@ -52,6 +52,16 @@ impl ChatListener for NodeChatListener { Ok(()) }); } + + fn connection_interrupted(&mut self) { + let callback_object_shared = self.callback_object.clone(); + self.js_channel.send(move |mut cx| { + let callback = callback_object_shared.to_inner(&mut cx); + let _result = call_method(&mut cx, callback, "_connection_interrupted", [])?; + callback_object_shared.finalize(&mut cx); + Ok(()) + }); + } } pub struct NodeMakeChatListener { diff --git a/swift/Sources/LibSignalClient/ChatListener.swift b/swift/Sources/LibSignalClient/ChatListener.swift index 3a5b0d893..707349bb5 100644 --- a/swift/Sources/LibSignalClient/ChatListener.swift +++ b/swift/Sources/LibSignalClient/ChatListener.swift @@ -18,14 +18,26 @@ public protocol ChatListener: AnyObject { /// Called when the server indicates that there are no further messages in the message queue. /// /// Note that further messages may still be delivered; this merely indicates that all messages - /// that were in the queue + /// that were in the queue *when the connection was established* have been delivered. /// /// The default implementation of this method does nothing. func chatServiceDidReceiveQueueEmpty(_ chat: ChatService) + + /// Called when the client gets disconnected from the server. + /// + /// This includes both deliberate disconnects as well as unexpected socket closures that will be + /// automatically retried. + /// + /// Will not be called if no other requests have been invoked for this connection attempt. That + /// is, you should never see this as the first callback, nor two of these callbacks in a row. + /// + /// The default implementation of this method does nothing. + func chatServiceConnectionWasInterrupted(_ chat: ChatService) } extension ChatListener { - func chatServiceDidReceiveQueueEmpty(_: ChatService) {} + public func chatServiceDidReceiveQueueEmpty(_: ChatService) {} + public func chatServiceConnectionWasInterrupted(_: ChatService) {} } internal class ChatListenerBridge { @@ -74,11 +86,20 @@ internal class ChatListenerBridge { bridge.chatListener.chatServiceDidReceiveQueueEmpty(chatService) } + let connectionInterrupted: SignalConnectionInterrupted = { rawCtx in + let bridge = Unmanaged.fromOpaque(rawCtx!).takeUnretainedValue() + guard let chatService = bridge.chatService else { + return + } + + bridge.chatListener.chatServiceConnectionWasInterrupted(chatService) + } return .init( ctx: Unmanaged.passRetained(self).toOpaque(), received_incoming_message: receivedIncomingMessage, received_queue_empty: receivedQueueEmpty, + connection_interrupted: connectionInterrupted, destroy: { rawCtx in _ = Unmanaged.fromOpaque(rawCtx!).takeRetainedValue() } diff --git a/swift/Sources/SignalFfi/signal_ffi.h b/swift/Sources/SignalFfi/signal_ffi.h index c349c2cde..b61ba613f 100644 --- a/swift/Sources/SignalFfi/signal_ffi.h +++ b/swift/Sources/SignalFfi/signal_ffi.h @@ -634,6 +634,8 @@ typedef void (*SignalReceivedIncomingMessage)(void *ctx, SignalOwnedBuffer envel typedef void (*SignalReceivedQueueEmpty)(void *ctx); +typedef void (*SignalConnectionInterrupted)(void *ctx); + typedef void (*SignalDestroyChatListener)(void *ctx); /** @@ -647,6 +649,7 @@ typedef struct { void *ctx; SignalReceivedIncomingMessage received_incoming_message; SignalReceivedQueueEmpty received_queue_empty; + SignalConnectionInterrupted connection_interrupted; SignalDestroyChatListener destroy; } SignalFfiChatListenerStruct; @@ -1572,6 +1575,8 @@ SignalFfiError *signal_chat_server_set_listener(const SignalTokioAsyncContext *r SignalFfiError *signal_testing_chat_service_inject_raw_server_request(const SignalChat *chat, SignalBorrowedBuffer bytes); +SignalFfiError *signal_testing_chat_service_inject_connection_interrupted(const SignalChat *chat); + SignalFfiError *signal_server_message_ack_destroy(SignalServerMessageAck *p); SignalFfiError *signal_server_message_ack_send(SignalCPromisebool *promise, const SignalTokioAsyncContext *async_runtime, const SignalServerMessageAck *ack); diff --git a/swift/Tests/LibSignalClientTests/ChatServiceTests.swift b/swift/Tests/LibSignalClientTests/ChatServiceTests.swift index 5b0961fe2..3ac93bc6d 100644 --- a/swift/Tests/LibSignalClientTests/ChatServiceTests.swift +++ b/swift/Tests/LibSignalClientTests/ChatServiceTests.swift @@ -20,6 +20,12 @@ extension ChatService { } } } + + func injectConnectionInterrupted() { + withNativeHandle { handle in + failOnError(signal_testing_chat_service_inject_connection_interrupted(handle)) + } + } } final class ChatServiceTests: TestCaseBase { @@ -155,11 +161,13 @@ final class ChatServiceTests: TestCaseBase { let queueEmpty: XCTestExpectation let firstMessageReceived: XCTestExpectation let secondMessageReceived: XCTestExpectation + let connectionInterrupted: XCTestExpectation - init(queueEmpty: XCTestExpectation, firstMessageReceived: XCTestExpectation, secondMessageReceived: XCTestExpectation) { + init(queueEmpty: XCTestExpectation, firstMessageReceived: XCTestExpectation, secondMessageReceived: XCTestExpectation, connectionInterrupted: XCTestExpectation) { self.queueEmpty = queueEmpty self.firstMessageReceived = firstMessageReceived self.secondMessageReceived = secondMessageReceived + self.connectionInterrupted = connectionInterrupted } func chatService(_ chat: ChatService, didReceiveIncomingMessage envelope: Data, serverDeliveryTimestamp: UInt64, sendAck: () async throws -> Void) { @@ -184,6 +192,12 @@ final class ChatServiceTests: TestCaseBase { self.stage += 1 self.queueEmpty.fulfill() } + + func chatServiceConnectionWasInterrupted(_: ChatService) { + XCTAssertEqual(self.stage, 3) + self.stage += 1 + self.connectionInterrupted.fulfill() + } } let net = Net(env: .staging, userAgent: Self.userAgent) @@ -191,7 +205,8 @@ final class ChatServiceTests: TestCaseBase { let listener = Listener( queueEmpty: expectation(description: "queue empty"), firstMessageReceived: expectation(description: "first message received"), - secondMessageReceived: expectation(description: "second message received") + secondMessageReceived: expectation(description: "second message received"), + connectionInterrupted: expectation(description: "connection interrupted") ) chat.setListener(listener) @@ -223,8 +238,10 @@ final class ChatServiceTests: TestCaseBase { // 4: 99 chat.injectServerRequest(base64: "CgNQVVQSEy9hcGkvdjEvcXVldWUvZW1wdHkgYw==") + chat.injectConnectionInterrupted() + waitForExpectations(timeout: 2) - XCTAssertEqual(listener.stage, 3) + XCTAssertEqual(listener.stage, 4) } #endif