WebSub

Hub

websub_hub_service_jwt_auth.bal

import ballerina/http;
import ballerina/jwt;
import ballerina/log;
import ballerina/websubhub;

listener websubhub:Listener securedHub = new (9090,
    secureSocket = {
        key: {
            certFile: "./resources/public.crt",
            keyFile: "./resources/private.key"
        }
    }
);

final http:ListenerJwtAuthHandler handler = new ({
    issuer: "wso2",
    audience: "ballerina",
    signatureConfig: {
        certFile: "./resources/public.crt"
    },
    scopeKey: "scp"
});

@websubhub:ServiceConfig {
    webHookConfig: {
        secureSocket: {
            cert: "./resources/public.crt"
        }
    }
}
isolated service /websubhub on securedHub {
    isolated remote function onRegisterTopic(websubhub:TopicRegistration msg, http:Headers headers) returns websubhub:TopicRegistrationSuccess|websubhub:TopicRegistrationError {
        string? auth = doAuth(headers);
        if auth is string {
            return websubhub:TOPIC_REGISTRATION_ERROR;
        }
        log:printInfo("Registered topic: '" + msg.topic + "'.");
        websubhub:TopicRegistrationSuccess result = {
            body: "Registered topic: '" + msg.topic + "'."
        };
        return result;
    }

    isolated remote function onDeregisterTopic(websubhub:TopicDeregistration msg, http:Headers headers) returns websubhub:TopicDeregistrationSuccess|websubhub:TopicDeregistrationError {
        string? auth = doAuth(headers);
        if auth is string {
            return websubhub:TOPIC_DEREGISTRATION_ERROR;
        }
        log:printInfo("Deregistered topic: '" + msg.topic + "'.");
        websubhub:TopicDeregistrationSuccess result = {
            body: "Deregistered topic: '" + msg.topic + "'."
        };
        return result;
    }

    isolated remote function onSubscription(websubhub:Subscription msg, http:Headers headers) returns websubhub:SubscriptionAccepted|websubhub:InternalSubscriptionError {
        string? auth = doAuth(headers);
        if auth is string {
            return websubhub:INTERNAL_SUBSCRIPTION_ERROR;
        }
        log:printInfo("Subscription accepted for topic: '" + msg.hubTopic + "'.");
        websubhub:SubscriptionAccepted result = {
            body: "Subscription accepted for topic: '" + msg.hubTopic + "'."
        };
        return result;
    }

    // Internal call from Hub itself
    isolated remote function onSubscriptionValidation(websubhub:Subscription msg) {
        log:printInfo("Subscription validated for topic: '" + msg.hubTopic + "'.");
    }

    // Internal call from Hub itself
    isolated remote function onSubscriptionIntentVerified(websubhub:VerifiedSubscription msg) {
        log:printInfo("Subscription intent verified: '" + msg.verificationSuccess.toString() + "', for topic: '" + msg.hubTopic + "'.");
        addSubscriber(msg);
    }

    isolated remote function onUnsubscriptionIntentVerified(websubhub:VerifiedUnsubscription msg) {
        log:printInfo("Unsubscription intent verified: '" + msg.verificationSuccess.toString() + "', for topic: '" + msg.hubTopic + "'.");
        removeSubscriber(msg);
    }

    isolated remote function onUpdateMessage(websubhub:UpdateMessage msg, http:Headers headers) returns websubhub:Acknowledgement|websubhub:UpdateMessageError {
        string? auth = doAuth(headers);
        if auth is string {
            return websubhub:UPDATE_MESSAGE_ERROR;
        }
        log:printInfo("Message updated for message type: '" + msg.msgType.toString() + "', topic: '" + msg.hubTopic +
                        "', content-type: '" + msg.contentType + "', content: '" + msg.content.toString() + "'.");
        websubhub:Subscription[] subscribers = retrieveSubscribers(msg.hubTopic);
        foreach websubhub:Subscription sub in subscribers {
            log:printInfo("Subscriber found with callback URL: '" + sub.hubCallback + "'");
            websubhub:HubClient|error clientEP = new (sub, {
                secureSocket: {
                    cert: "./resources/public.crt"
                }
            });
            if clientEP is error {
                log:printError("Error occurred while initializing the hub client.", 'error = clientEP);
            } else {
                websubhub:ContentDistributionMessage distributionMsg = {
                    content: msg.content.toString()
                };
                websubhub:ContentDistributionSuccess|websubhub:SubscriptionDeletedError|error? response = clientEP->notifyContentDistribution(distributionMsg);
                if response is websubhub:ContentDistributionSuccess {
                    log:printInfo("Successfully notified the content to subscriber.");
                } else if response is websubhub:SubscriptionDeletedError {
                    log:printError("Subscription deleted error occurred while notifying the content to subscriber.");
                } else if response is error {
                    log:printError("Error occurred while notifying the content to subscriber.", 'error = response);
                }
            }
        }
        return websubhub:ACKNOWLEDGEMENT;
    }
}

isolated function doAuth(http:Headers headers) returns string? {
    jwt:Payload|http:Unauthorized authn = handler.authenticate(headers);
    if authn is http:Unauthorized {
        string errorMsg = "Failed to authenticate the request. " + <string>authn?.body;
        log:printError(errorMsg);
        return errorMsg;
    }
    http:Forbidden? authz = handler.authorize(<jwt:Payload>authn, "admin");
    if authz is http:Forbidden {
        string errorMsg = "Failed to authorize the request for the scope key: 'scp' and value: 'admin'.";
        log:printError(errorMsg);
        return errorMsg;
    }
    return;
}

isolated map<websubhub:Subscription[]> subscribersMap = {};

isolated function addSubscriber(websubhub:Subscription subscriber) {
    lock {
        if subscribersMap.hasKey(subscriber.hubTopic) {
            subscribersMap.get(subscriber.hubTopic).push(subscriber.cloneReadOnly());
        } else {
            websubhub:Subscription[] subscribersArray = [];
            subscribersArray.push(subscriber.cloneReadOnly());
            subscribersMap[subscriber.hubTopic] = subscribersArray;
        }
    }
}

isolated function removeSubscriber(websubhub:Unsubscription subscriber) {
    lock {
        if subscribersMap.hasKey(subscriber.hubTopic) {
            websubhub:Subscription[] subscribersArray = subscribersMap.get(subscriber.hubTopic);
            int i = 0;
            foreach websubhub:Subscription sub in subscribersArray {
                if sub.hubCallback == subscriber.hubCallback {
                    break;
                }
                i = i + 1;
            }
            _ = subscribersArray.remove(i);
            subscribersMap[subscriber.hubTopic] = subscribersArray;
        }
    }
}

isolated function isTopicAvailable(string topic) returns boolean {
    lock {
        return subscribersMap.hasKey(topic);
    }
}

isolated function retrieveSubscribers(string topic) returns websubhub:Subscription[] {
    lock {
        return subscribersMap.get(topic).clone();
    }
}
$ bal run websub_hub_service_jwt_auth.bal

Subscriber

websub_subscriber_service_jwt_auth.bal

import ballerina/log;
import ballerina/websub;

listener websub:Listener securedSubscriber = new (8080,
    secureSocket = {
        key: {
            certFile: "./resources/public.crt",
            keyFile: "./resources/private.key"
        }
    }
);

@websub:SubscriberServiceConfig {
    target: [
        "https://localhost:9090/websubhub", "Ballerina"
    ],
    callback: "https://localhost:8080/subscriber",
    secret: "b745e11a57e9",
    httpConfig: {
        auth: {
            username: "ballerina",
            issuer: "wso2",
            audience: ["ballerina", "ballerina.org", "ballerina.io"],
            keyId: "5a0b754-895f-4279-8843-b745e11a57e9",
            jwtId: "JlbmMiOiJBMTI4Q0JDLUhTMjU2In",
            customClaims: { "scp": "admin" },
            expTime: 3600,
            signatureConfig: {
                config: {
                    keyFile: "./resources/private.key"
                }
            }
        },
        secureSocket: {
            cert: "./resources/public.crt"
        }
    }
}
isolated service /subscriber on securedSubscriber {
    isolated remote function onEventNotification(websub:ContentDistributionMessage event) returns websub:Acknowledgement {
        log:printInfo("Event notified with content: '" + event.content.toString() + "'.");
        websub:Acknowledgement result = {
            body: { message: "Event notified with content: '" + event.content.toString() + "'." }
        };
        return result;
    }

    isolated remote function onSubscriptionVerification(websub:SubscriptionVerification msg) returns websub:SubscriptionVerificationSuccess {
        log:printInfo("Subscription verification success. Hub mode: '" + msg.hubMode + "', hub topic: '" +
                       msg.hubTopic + "', hub challenge: '" + msg.hubChallenge + "'.");
        websub:SubscriptionVerificationSuccess result = {
            body: { message: "Subscription verification success. Hub mode: '" + msg.hubMode + "', hub topic: '" + msg.hubTopic +
                              "', hub challenge: '" + msg.hubChallenge + "'." }
        };
        return result;
    }
}
$ bal run websub_subscriber_service_jwt_auth.bal

Publisher

websub_publisher_client_jwt_auth.bal

import ballerina/io;
import ballerina/log;
import ballerina/websubhub;

public function main() returns error? {
    websubhub:PublisherClient publisherClient = check new ("https://localhost:9090/websubhub",
        auth = {
            username: "ballerina",
            issuer: "wso2",
            audience: ["ballerina", "ballerina.org", "ballerina.io"],
            keyId: "5a0b754-895f-4279-8843-b745e11a57e9",
            jwtId: "JlbmMiOiJBMTI4Q0JDLUhTMjU2In",
            customClaims: { "scp": "admin" },
            expTime: 3600,
            signatureConfig: {
                config: {
                    keyFile: "./resources/private.key"
                }
            }
        },
        secureSocket = {
            cert: "./resources/public.crt"
        }
    );

    websubhub:TopicRegistrationSuccess|websubhub:TopicRegistrationError registrationResponse = publisherClient->registerTopic("Ballerina");
    if registrationResponse is websubhub:TopicRegistrationSuccess {
        io:println("Topic registration successful.");
    } else {
        log:printError("Topic registration failed!.", 'error = registrationResponse);
    }

    string payload = "Swan Lake GA Released!";
    websubhub:Acknowledgement|websubhub:UpdateMessageError publishResponse = publisherClient->publishUpdate("Ballerina", payload);
    if publishResponse is websubhub:Acknowledgement {
        io:println("Publish update successful.");
    } else {
        log:printError("Publish update failed!.", 'error = publishResponse);
    }
}
$ bal run websub_publisher_client_jwt_auth.bal

Resources

public.crt

-----BEGIN CERTIFICATE-----
MIIDdzCCAl+gAwIBAgIEfP3e8zANBgkqhkiG9w0BAQsFADBkMQswCQYDVQQGEwJV
UzELMAkGA1UECBMCQ0ExFjAUBgNVBAcTDU1vdW50YWluIFZpZXcxDTALBgNVBAoT
BFdTTzIxDTALBgNVBAsTBFdTTzIxEjAQBgNVBAMTCWxvY2FsaG9zdDAeFw0xNzEw
MjQwNTQ3NThaFw0zNzEwMTkwNTQ3NThaMGQxCzAJBgNVBAYTAlVTMQswCQYDVQQI
EwJDQTEWMBQGA1UEBxMNTW91bnRhaW4gVmlldzENMAsGA1UEChMEV1NPMjENMAsG
A1UECxMEV1NPMjESMBAGA1UEAxMJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEF
AAOCAQ8AMIIBCgKCAQEAgVyi6fViVLiZKEnw59xzNi1lcYh6z9dZnug+F9gKqFIg
mdcPe+qtS7gZc1jYTjWMCbx13sFLkZqNHeDUadpmtKo3TDduOl1sqM6cz3yXb6L3
4k/leh50mzIPNmaaXxd3vOQoK4OpkgO1n32mh6+tkp3sbHmfYqDQrkVK1tmYNtPJ
ffSCLT+CuIhnJUJco7N0unax+ySZN67/AX++sJpqAhAIZJzrRi6ueN3RFCIxYDXS
MvxrEmOdn4gOC0o1Ar9u5Bp9N52sqqGbN1x6jNKi3bfUj122Hu5e+Y9KOmfbchhQ
il2P81cIi30VKgyDn5DeWEuDoYredk4+6qAZrxMw+wIDAQABozEwLzAOBgNVHQ8B
Af8EBAMCBaAwHQYDVR0OBBYEFNmtrQ36j6tUGhKrfW9qWWE7KFzMMA0GCSqGSIb3
DQEBCwUAA4IBAQAv3yOwgbtOu76eJMl1BCcgTFgaMUBZoUjK9Un6HGjKEgYz/YWS
ZFlY/qH5rT01DWQevUZB626d5ZNdzSBZRlpsxbf9IE/ursNHwHx9ua6fB7yHUCzC
1ZMp1lvBHABi7wcA+5nbV6zQ7HDmBXFhJfbgH1iVmA1KcvDeBPSJ/scRGasZ5q2W
3IenDNrfPIUhD74tFiCiqNJO91qD/LO+++3XeZzfPh8NRKkiPX7dB8WJ3YNBuQAv
gRWTISpSSXLmqMb+7MPQVgecsepZdk8CwkRLxh3RKPJMjigmCgyvkSaoDMKAYC3i
YjfUTiJ57UeqoSl0IaOFJ0wfZRFh+UytlDZa
-----END CERTIFICATE-----

private.key

-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCBXKLp9WJUuJko
SfDn3HM2LWVxiHrP11me6D4X2AqoUiCZ1w976q1LuBlzWNhONYwJvHXewUuRmo0d
4NRp2ma0qjdMN246XWyozpzPfJdvovfiT+V6HnSbMg82ZppfF3e85Cgrg6mSA7Wf
faaHr62SnexseZ9ioNCuRUrW2Zg208l99IItP4K4iGclQlyjs3S6drH7JJk3rv8B
f76wmmoCEAhknOtGLq543dEUIjFgNdIy/GsSY52fiA4LSjUCv27kGn03nayqoZs3
XHqM0qLdt9SPXbYe7l75j0o6Z9tyGFCKXY/zVwiLfRUqDIOfkN5YS4Ohit52Tj7q
oBmvEzD7AgMBAAECggEAXM/F4u23OummmQ1T1kaIMpqnaalt06jCGAywYBMUsmca
FMYDyfg5lVXkjKl1p8crTeD1AHjWawTjskgYnkmf3ocxXXF3mFBnIUX7o7HURLg7
+RcxoUgwiRiFaZZ7szX3JoLbfzzbcHNQ37kavccBVWwQsFMiU3Tlw+LbKwK6/row
LYsQPx7gT4u7hViat4vQDTYcgyjvvFCiek4ndL6O9K49MxIMU678UXB6ia5iUevy
vgEfcYkKQ5EQ38qS3ZwsubPvj4633jvAJRr/hJD8XINZC74kTXeV3BGH2LlpQOEq
kWkOypwYNjnXtt1JO8+Iu6mEXKUoiIBPfGrJ3vDSQQKBgQDmYPc7kfYan/LHjJRv
iE2CwbC26yVA6+BEPQv9z7jChO9Q6cUbGvM8EEVNpC9nmFogkslzJhz55HP84QZL
u3ptU+D96ncq6zkBqxBfRnZG++D36+XRXIwzz3h+g1Nwrl0y0MFbwlkMm3ZqJdd6
pZz1FZGd6zvQftW8m7jPSKHuswKBgQCPv6czFOZR6bI+qCQdaORpe9JGoAduOD+4
YKl96s0eiAKhkGhFCrMd6GJwWRkpNcfwB+J9sMahORbfvwiYanI56h7Vi30DFPRb
m1m8dLkr6z+8bxMxKJaMXIIjy3UDamgDr7QHInNUih2iGvtB8QqZ0aobsB2XIxZg
qESTMcpYmQKBgHSwSqneraQgvgz7FLhFdtUzHDoacr0mfGqz7R37F99XDAyUy+SF
ywvyRdgkwGodjhEPqH/tnyGn6GP+6nxzknhL0xtppkCT8kT5C4rmmsQrknChCL/5
u34GqUaTaDEb8FLrz/SVRRuQpvLvBey2dADjkuVFH//kLoig64P6iyLnAoGBAIlF
g+2L78YZXVXoS1SqbjUtQUigWXgvzunLpQ/Rwb9+MsUGmgwUg6fz2s1eyGBKM3xM
i0VsIsKjOezBCPxD6oDTyk4yvlbLE+7HE5KcBJikNmFD0RgIonu3e6+jA0MXweyD
RW/qviflHRdInNgDzxPE3KVEMX26zAvRpGrMCWdBAoGAdQ5SvX+mAC3cKqoQ9Zal
lSqWoyjfzP5EaVRG8dtoLxbznQGTTvtHXc65/MznX/L9qkWCS6Eb4HH5M3hFNY46
LNIzGQLznE1odwv7H5B8c0/m3DrKTxbh8bYcrR1BW5/nKZNNW7k1O6OjEozvAajK
JQdp3KBU9S8CmBjGrRpJ2qw=
-----END PRIVATE KEY-----