WebSub
Hub
websub_hub_service_jwt_auth.bal
import ballerina/http;
import ballerina/jwt;
import ballerina/log;
import ballerina/websubhub;
listener websubhub:Listener securedHub = new (9090,
secureSocket = {
key: {
certFile: "./resources/public.crt",
keyFile: "./resources/private.key"
}
}
);
final http:ListenerJwtAuthHandler handler = new ({
issuer: "wso2",
audience: "ballerina",
signatureConfig: {
certFile: "./resources/public.crt"
},
scopeKey: "scp"
});
@websubhub:ServiceConfig {
webHookConfig: {
secureSocket: {
cert: "./resources/public.crt"
}
}
}
isolated service /websubhub on securedHub {
isolated remote function onRegisterTopic(websubhub:TopicRegistration msg, http:Headers headers) returns websubhub:TopicRegistrationSuccess|websubhub:TopicRegistrationError {
string? auth = doAuth(headers);
if auth is string {
return websubhub:TOPIC_REGISTRATION_ERROR;
}
log:printInfo("Registered topic: '" + msg.topic + "'.");
websubhub:TopicRegistrationSuccess result = {
body: "Registered topic: '" + msg.topic + "'."
};
return result;
}
isolated remote function onDeregisterTopic(websubhub:TopicDeregistration msg, http:Headers headers) returns websubhub:TopicDeregistrationSuccess|websubhub:TopicDeregistrationError {
string? auth = doAuth(headers);
if auth is string {
return websubhub:TOPIC_DEREGISTRATION_ERROR;
}
log:printInfo("Deregistered topic: '" + msg.topic + "'.");
websubhub:TopicDeregistrationSuccess result = {
body: "Deregistered topic: '" + msg.topic + "'."
};
return result;
}
isolated remote function onSubscription(websubhub:Subscription msg, http:Headers headers) returns websubhub:SubscriptionAccepted|websubhub:InternalSubscriptionError {
string? auth = doAuth(headers);
if auth is string {
return websubhub:INTERNAL_SUBSCRIPTION_ERROR;
}
log:printInfo("Subscription accepted for topic: '" + msg.hubTopic + "'.");
websubhub:SubscriptionAccepted result = {
body: "Subscription accepted for topic: '" + msg.hubTopic + "'."
};
return result;
}
// Internal call from Hub itself
isolated remote function onSubscriptionValidation(websubhub:Subscription msg) {
log:printInfo("Subscription validated for topic: '" + msg.hubTopic + "'.");
}
// Internal call from Hub itself
isolated remote function onSubscriptionIntentVerified(websubhub:VerifiedSubscription msg) {
log:printInfo("Subscription intent verified: '" + msg.verificationSuccess.toString() + "', for topic: '" + msg.hubTopic + "'.");
addSubscriber(msg);
}
isolated remote function onUnsubscriptionIntentVerified(websubhub:VerifiedUnsubscription msg) {
log:printInfo("Unsubscription intent verified: '" + msg.verificationSuccess.toString() + "', for topic: '" + msg.hubTopic + "'.");
removeSubscriber(msg);
}
isolated remote function onUpdateMessage(websubhub:UpdateMessage msg, http:Headers headers) returns websubhub:Acknowledgement|websubhub:UpdateMessageError {
string? auth = doAuth(headers);
if auth is string {
return websubhub:UPDATE_MESSAGE_ERROR;
}
log:printInfo("Message updated for message type: '" + msg.msgType.toString() + "', topic: '" + msg.hubTopic +
"', content-type: '" + msg.contentType + "', content: '" + msg.content.toString() + "'.");
websubhub:Subscription[] subscribers = retrieveSubscribers(msg.hubTopic);
foreach websubhub:Subscription sub in subscribers {
log:printInfo("Subscriber found with callback URL: '" + sub.hubCallback + "'");
websubhub:HubClient|error clientEP = new (sub, {
secureSocket: {
cert: "./resources/public.crt"
}
});
if clientEP is error {
log:printError("Error occurred while initializing the hub client.", 'error = clientEP);
} else {
websubhub:ContentDistributionMessage distributionMsg = {
content: msg.content.toString()
};
websubhub:ContentDistributionSuccess|websubhub:SubscriptionDeletedError|error? response = clientEP->notifyContentDistribution(distributionMsg);
if response is websubhub:ContentDistributionSuccess {
log:printInfo("Successfully notified the content to subscriber.");
} else if response is websubhub:SubscriptionDeletedError {
log:printError("Subscription deleted error occurred while notifying the content to subscriber.");
} else if response is error {
log:printError("Error occurred while notifying the content to subscriber.", 'error = response);
}
}
}
return websubhub:ACKNOWLEDGEMENT;
}
}
isolated function doAuth(http:Headers headers) returns string? {
jwt:Payload|http:Unauthorized authn = handler.authenticate(headers);
if authn is http:Unauthorized {
string errorMsg = "Failed to authenticate the request. " + <string>authn?.body;
log:printError(errorMsg);
return errorMsg;
}
http:Forbidden? authz = handler.authorize(<jwt:Payload>authn, "admin");
if authz is http:Forbidden {
string errorMsg = "Failed to authorize the request for the scope key: 'scp' and value: 'admin'.";
log:printError(errorMsg);
return errorMsg;
}
return;
}
isolated map<websubhub:Subscription[]> subscribersMap = {};
isolated function addSubscriber(websubhub:Subscription subscriber) {
lock {
if subscribersMap.hasKey(subscriber.hubTopic) {
subscribersMap.get(subscriber.hubTopic).push(subscriber.cloneReadOnly());
} else {
websubhub:Subscription[] subscribersArray = [];
subscribersArray.push(subscriber.cloneReadOnly());
subscribersMap[subscriber.hubTopic] = subscribersArray;
}
}
}
isolated function removeSubscriber(websubhub:Unsubscription subscriber) {
lock {
if subscribersMap.hasKey(subscriber.hubTopic) {
websubhub:Subscription[] subscribersArray = subscribersMap.get(subscriber.hubTopic);
int i = 0;
foreach websubhub:Subscription sub in subscribersArray {
if sub.hubCallback == subscriber.hubCallback {
break;
}
i = i + 1;
}
_ = subscribersArray.remove(i);
subscribersMap[subscriber.hubTopic] = subscribersArray;
}
}
}
isolated function isTopicAvailable(string topic) returns boolean {
lock {
return subscribersMap.hasKey(topic);
}
}
isolated function retrieveSubscribers(string topic) returns websubhub:Subscription[] {
lock {
return subscribersMap.get(topic).clone();
}
}
$ bal run websub_hub_service_jwt_auth.bal
Subscriber
websub_subscriber_service_jwt_auth.bal
import ballerina/log;
import ballerina/websub;
listener websub:Listener securedSubscriber = new (8080,
secureSocket = {
key: {
certFile: "./resources/public.crt",
keyFile: "./resources/private.key"
}
}
);
@websub:SubscriberServiceConfig {
target: [
"https://localhost:9090/websubhub", "Ballerina"
],
callback: "https://localhost:8080/subscriber",
secret: "b745e11a57e9",
httpConfig: {
auth: {
username: "ballerina",
issuer: "wso2",
audience: ["ballerina", "ballerina.org", "ballerina.io"],
keyId: "5a0b754-895f-4279-8843-b745e11a57e9",
jwtId: "JlbmMiOiJBMTI4Q0JDLUhTMjU2In",
customClaims: { "scp": "admin" },
expTime: 3600,
signatureConfig: {
config: {
keyFile: "./resources/private.key"
}
}
},
secureSocket: {
cert: "./resources/public.crt"
}
}
}
isolated service /subscriber on securedSubscriber {
isolated remote function onEventNotification(websub:ContentDistributionMessage event) returns websub:Acknowledgement {
log:printInfo("Event notified with content: '" + event.content.toString() + "'.");
websub:Acknowledgement result = {
body: { message: "Event notified with content: '" + event.content.toString() + "'." }
};
return result;
}
isolated remote function onSubscriptionVerification(websub:SubscriptionVerification msg) returns websub:SubscriptionVerificationSuccess {
log:printInfo("Subscription verification success. Hub mode: '" + msg.hubMode + "', hub topic: '" +
msg.hubTopic + "', hub challenge: '" + msg.hubChallenge + "'.");
websub:SubscriptionVerificationSuccess result = {
body: { message: "Subscription verification success. Hub mode: '" + msg.hubMode + "', hub topic: '" + msg.hubTopic +
"', hub challenge: '" + msg.hubChallenge + "'." }
};
return result;
}
}
$ bal run websub_subscriber_service_jwt_auth.bal
Publisher
websub_publisher_client_jwt_auth.bal
import ballerina/io;
import ballerina/log;
import ballerina/websubhub;
public function main() returns error? {
websubhub:PublisherClient publisherClient = check new ("https://localhost:9090/websubhub",
auth = {
username: "ballerina",
issuer: "wso2",
audience: ["ballerina", "ballerina.org", "ballerina.io"],
keyId: "5a0b754-895f-4279-8843-b745e11a57e9",
jwtId: "JlbmMiOiJBMTI4Q0JDLUhTMjU2In",
customClaims: { "scp": "admin" },
expTime: 3600,
signatureConfig: {
config: {
keyFile: "./resources/private.key"
}
}
},
secureSocket = {
cert: "./resources/public.crt"
}
);
websubhub:TopicRegistrationSuccess|websubhub:TopicRegistrationError registrationResponse = publisherClient->registerTopic("Ballerina");
if registrationResponse is websubhub:TopicRegistrationSuccess {
io:println("Topic registration successful.");
} else {
log:printError("Topic registration failed!.", 'error = registrationResponse);
}
string payload = "Swan Lake GA Released!";
websubhub:Acknowledgement|websubhub:UpdateMessageError publishResponse = publisherClient->publishUpdate("Ballerina", payload);
if publishResponse is websubhub:Acknowledgement {
io:println("Publish update successful.");
} else {
log:printError("Publish update failed!.", 'error = publishResponse);
}
}
$ bal run websub_publisher_client_jwt_auth.bal
Resources
public.crt
-----BEGIN CERTIFICATE-----
MIIDdzCCAl+gAwIBAgIEfP3e8zANBgkqhkiG9w0BAQsFADBkMQswCQYDVQQGEwJV
UzELMAkGA1UECBMCQ0ExFjAUBgNVBAcTDU1vdW50YWluIFZpZXcxDTALBgNVBAoT
BFdTTzIxDTALBgNVBAsTBFdTTzIxEjAQBgNVBAMTCWxvY2FsaG9zdDAeFw0xNzEw
MjQwNTQ3NThaFw0zNzEwMTkwNTQ3NThaMGQxCzAJBgNVBAYTAlVTMQswCQYDVQQI
EwJDQTEWMBQGA1UEBxMNTW91bnRhaW4gVmlldzENMAsGA1UEChMEV1NPMjENMAsG
A1UECxMEV1NPMjESMBAGA1UEAxMJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEF
AAOCAQ8AMIIBCgKCAQEAgVyi6fViVLiZKEnw59xzNi1lcYh6z9dZnug+F9gKqFIg
mdcPe+qtS7gZc1jYTjWMCbx13sFLkZqNHeDUadpmtKo3TDduOl1sqM6cz3yXb6L3
4k/leh50mzIPNmaaXxd3vOQoK4OpkgO1n32mh6+tkp3sbHmfYqDQrkVK1tmYNtPJ
ffSCLT+CuIhnJUJco7N0unax+ySZN67/AX++sJpqAhAIZJzrRi6ueN3RFCIxYDXS
MvxrEmOdn4gOC0o1Ar9u5Bp9N52sqqGbN1x6jNKi3bfUj122Hu5e+Y9KOmfbchhQ
il2P81cIi30VKgyDn5DeWEuDoYredk4+6qAZrxMw+wIDAQABozEwLzAOBgNVHQ8B
Af8EBAMCBaAwHQYDVR0OBBYEFNmtrQ36j6tUGhKrfW9qWWE7KFzMMA0GCSqGSIb3
DQEBCwUAA4IBAQAv3yOwgbtOu76eJMl1BCcgTFgaMUBZoUjK9Un6HGjKEgYz/YWS
ZFlY/qH5rT01DWQevUZB626d5ZNdzSBZRlpsxbf9IE/ursNHwHx9ua6fB7yHUCzC
1ZMp1lvBHABi7wcA+5nbV6zQ7HDmBXFhJfbgH1iVmA1KcvDeBPSJ/scRGasZ5q2W
3IenDNrfPIUhD74tFiCiqNJO91qD/LO+++3XeZzfPh8NRKkiPX7dB8WJ3YNBuQAv
gRWTISpSSXLmqMb+7MPQVgecsepZdk8CwkRLxh3RKPJMjigmCgyvkSaoDMKAYC3i
YjfUTiJ57UeqoSl0IaOFJ0wfZRFh+UytlDZa
-----END CERTIFICATE-----
private.key
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCBXKLp9WJUuJko
SfDn3HM2LWVxiHrP11me6D4X2AqoUiCZ1w976q1LuBlzWNhONYwJvHXewUuRmo0d
4NRp2ma0qjdMN246XWyozpzPfJdvovfiT+V6HnSbMg82ZppfF3e85Cgrg6mSA7Wf
faaHr62SnexseZ9ioNCuRUrW2Zg208l99IItP4K4iGclQlyjs3S6drH7JJk3rv8B
f76wmmoCEAhknOtGLq543dEUIjFgNdIy/GsSY52fiA4LSjUCv27kGn03nayqoZs3
XHqM0qLdt9SPXbYe7l75j0o6Z9tyGFCKXY/zVwiLfRUqDIOfkN5YS4Ohit52Tj7q
oBmvEzD7AgMBAAECggEAXM/F4u23OummmQ1T1kaIMpqnaalt06jCGAywYBMUsmca
FMYDyfg5lVXkjKl1p8crTeD1AHjWawTjskgYnkmf3ocxXXF3mFBnIUX7o7HURLg7
+RcxoUgwiRiFaZZ7szX3JoLbfzzbcHNQ37kavccBVWwQsFMiU3Tlw+LbKwK6/row
LYsQPx7gT4u7hViat4vQDTYcgyjvvFCiek4ndL6O9K49MxIMU678UXB6ia5iUevy
vgEfcYkKQ5EQ38qS3ZwsubPvj4633jvAJRr/hJD8XINZC74kTXeV3BGH2LlpQOEq
kWkOypwYNjnXtt1JO8+Iu6mEXKUoiIBPfGrJ3vDSQQKBgQDmYPc7kfYan/LHjJRv
iE2CwbC26yVA6+BEPQv9z7jChO9Q6cUbGvM8EEVNpC9nmFogkslzJhz55HP84QZL
u3ptU+D96ncq6zkBqxBfRnZG++D36+XRXIwzz3h+g1Nwrl0y0MFbwlkMm3ZqJdd6
pZz1FZGd6zvQftW8m7jPSKHuswKBgQCPv6czFOZR6bI+qCQdaORpe9JGoAduOD+4
YKl96s0eiAKhkGhFCrMd6GJwWRkpNcfwB+J9sMahORbfvwiYanI56h7Vi30DFPRb
m1m8dLkr6z+8bxMxKJaMXIIjy3UDamgDr7QHInNUih2iGvtB8QqZ0aobsB2XIxZg
qESTMcpYmQKBgHSwSqneraQgvgz7FLhFdtUzHDoacr0mfGqz7R37F99XDAyUy+SF
ywvyRdgkwGodjhEPqH/tnyGn6GP+6nxzknhL0xtppkCT8kT5C4rmmsQrknChCL/5
u34GqUaTaDEb8FLrz/SVRRuQpvLvBey2dADjkuVFH//kLoig64P6iyLnAoGBAIlF
g+2L78YZXVXoS1SqbjUtQUigWXgvzunLpQ/Rwb9+MsUGmgwUg6fz2s1eyGBKM3xM
i0VsIsKjOezBCPxD6oDTyk4yvlbLE+7HE5KcBJikNmFD0RgIonu3e6+jA0MXweyD
RW/qviflHRdInNgDzxPE3KVEMX26zAvRpGrMCWdBAoGAdQ5SvX+mAC3cKqoQ9Zal
lSqWoyjfzP5EaVRG8dtoLxbznQGTTvtHXc65/MznX/L9qkWCS6Eb4HH5M3hFNY46
LNIzGQLznE1odwv7H5B8c0/m3DrKTxbh8bYcrR1BW5/nKZNNW7k1O6OjEozvAajK
JQdp3KBU9S8CmBjGrRpJ2qw=
-----END PRIVATE KEY-----