Kafka

Kafka Server

docker-compose.yml

version: '2'

services:
  zookeeper:
    image: 'confluentinc/cp-zookeeper:latest'
    hostname: zookeeper
    container_name: kafka-zookeeper
    ports:
      - '2181:2181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: INFO
      KAFKA_OPTS: '-Djava.security.auth.login.config=/opt/confluentinc/kafka/config/zookeeper_jaas.conf'
    volumes:
      - ./zookeeper_jaas.conf:/opt/confluentinc/kafka/config/zookeeper_jaas.conf
  broker:
    image: 'confluentinc/cp-server:latest'
    hostname: broker
    container_name: kafka-broker
    depends_on:
      - zookeeper
    ports:
      - '9092:9092'
      - '9093:9093'
      - '9094:9094'
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_SECURITY_INTER_BROKER_PROTOCOL: PLAINTEXT
      KAFKA_SSL_CLIENT_AUTH: "required"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,EX_PLAINTEXT://localhost:9092,EX_SASL_PLAINTEXT://localhost:9093,EX_SSL://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EX_PLAINTEXT:PLAINTEXT,EX_SASL_PLAINTEXT:SASL_PLAINTEXT,EX_SSL:SSL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_SSL_KEYSTORE_FILENAME: kafka.broker.keystore.jks
      KAFKA_SSL_KEYSTORE_CREDENTIALS: broker_keystore_credentials
      KAFKA_SSL_KEY_CREDENTIALS: broker_ssl_key_credentials
      KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.broker.truststore.jks
      KAFKA_SSL_TRUSTSTORE_CREDENTIALS: broker_truststore_credentials
      KAFKA_LOG4J_ROOT_LOGLEVEL: INFO
      KAFKA_OPTS: '-Djava.security.auth.login.config=/opt/confluentinc/kafka/config/kafka_jaas.conf'
    volumes:
      - ./secrets/broker_keystore_credentials:/etc/kafka/secrets/broker_keystore_credentials
      - ./secrets/broker_ssl_key_credentials:/etc/kafka/secrets/broker_ssl_key_credentials
      - ./secrets/broker_truststore_credentials:/etc/kafka/secrets/broker_truststore_credentials
      - ./keystore/keystore.jks:/etc/kafka/secrets/kafka.broker.keystore.jks
      - ./keystore/truststore.jks:/etc/kafka/secrets/kafka.broker.truststore.jks
      - ./kafka_jaas.conf:/opt/confluentinc/kafka/config/kafka_jaas.conf

kafka_jaas.conf

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    serviceName="test-service"
    username="alice"
    password="alice@123"
    user_alice="alice@123";
};

Client {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="alice"
    password="alice@123";
};

zookeeper_jaas.conf

Server {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="alice"
    password="alice@123"
    user_alice="alice@123";
};
$ docker-compose -f docker-compose.yml up

Consumer

kafka_consumer_sasl_plain_auth.bal

import ballerina/io;
import ballerinax/kafka;

const string TOPIC = "demo-security";

listener kafka:Listener securedEP = new ("localhost:9094",
    topics = [TOPIC],
    groupId = "consumer-group-1",
    auth = {
        mechanism: kafka:AUTH_SASL_PLAIN,
        username: "alice",
        password: "alice@123"
    },
    securityProtocol = kafka:PROTOCOL_SSL,
    secureSocket = {
        cert: "./resources/cert/server.crt",
        key: {
            certFile: "./resources/cert/client.crt",
            keyFile: "./resources/key/client.key"
        },
        protocol: {
            name: kafka:TLS
        }
    }
);

isolated service kafka:Service on securedEP {
    isolated remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) returns error? {
        io:println("Received message: ", string:fromBytes(records.pop().value));
        check caller->commit();
    }
}
$ bal run kafka_consumer_sasl_plain_auth.bal

Producer

kafka_producer_sasl_plain_auth.bal

import ballerina/io;
import ballerinax/kafka;

const string TOPIC = "demo-security";
const string MESSAGE = "Hello, World!";

final kafka:Producer producer = check new ("localhost:9094",
    auth = {
        mechanism: kafka:AUTH_SASL_PLAIN,
        username: "alice",
        password: "alice@123"
    },
    securityProtocol = kafka:PROTOCOL_SSL,
    secureSocket = {
        cert: "./resources/cert/server.crt",
        key: {
            certFile: "./resources/cert/client.crt",
            keyFile: "./resources/key/client.key"
        },
        protocol: {
            name: kafka:TLS
        }
    }
);

public function main() returns error? {
    check producer->send({ topic: TOPIC, value: MESSAGE.toBytes() });
    check producer->'flush();
    io:println("Message published successfully.");
}
$ bal run kafka_producer_sasl_plain_auth.bal

Resources

ca.crt

-----BEGIN CERTIFICATE-----
MIIDPjCCAiYCCQDiEWXPiJfzKjANBgkqhkiG9w0BAQsFADBhMQswCQYDVQQGEwJM
SzEQMA4GA1UECAwHV2VzdGVybjEQMA4GA1UEBwwHQ29sb21ibzENMAsGA1UECgwE
QVZJWDELMAkGA1UECwwCQ0ExEjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0yMTA2MzAx
MjU2NTBaFw0zMTA2MjgxMjU2NTBaMGExCzAJBgNVBAYTAkxLMRAwDgYDVQQIDAdX
ZXN0ZXJuMRAwDgYDVQQHDAdDb2xvbWJvMQ0wCwYDVQQKDARBVklYMQswCQYDVQQL
DAJDQTESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A
MIIBCgKCAQEA0SlFT4SbEx2zGyzPTrdJSHOPdCEd7gsy6nheyIsjHECngqVVKYwH
Jq4c3RByWJfggvy/NP+mSqi7zCn+lZY1i39TpBmYxF/rhFYIEFNDB8d8pgvZ+0AR
3eB0baaYkkKsMokTjnX43AUm1i+nMTWTTy6thIvabEyT+YgdvoFEPOzQ9UsyeaQ2
bhryuZtQFYi4k3FjSONr8eKxdpjNjOgY03yi8BRW69e+s9Sa8jOMGhhphRpPeuSf
mJMEiRMtBlHsl1QkBRJs/l/rhr9M56CkDEsevG9EQaYsvW7hb1ChU13PksJ9uFWV
mclkwxFFvGXIwV3DJqR9qe/o43usO443XQIDAQABMA0GCSqGSIb3DQEBCwUAA4IB
AQCwzA0H0FCCIuO09gMrdtZIglbsGLRRRqmfXXRWNg63vOUSqSy50JucZtijRMTA
6lKDG1xZopBtI2G5P2SVDmVgwBiJYFxWGMlkF3k6Urid3HO1fQ6ljtmx21kOCtK+
0t++t2oRSpL1qXZkaTK29H4ExpHjlaxQSi9dYvnnPa/1RiIgjzUVt4SSwSI4x8LH
KLll0aRYel/nth7+K8Ld5v56isIv1nwMCG/V/iiN60Eeho9VxXcTemWCgwsEpy0J
arX2HPHvHXGV750FS9iyPNVDRpG60XsBb0Rq3sVPfJ7yEJ8TMgGRe1ab7aE6CEnl
hMhVGtBdnjJNOHcOaGPDZJru
-----END CERTIFICATE-----

client.crt

-----BEGIN CERTIFICATE-----
MIIDgTCCAmmgAwIBAgIJAKD3AgGoknU6MA0GCSqGSIb3DQEBCwUAMGExCzAJBgNV
BAYTAkxLMRAwDgYDVQQIDAdXZXN0ZXJuMRAwDgYDVQQHDAdDb2xvbWJvMQ0wCwYD
VQQKDARBVklYMQswCQYDVQQLDAJDQTESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTIx
MDYzMDEzMzIxOFoXDTMxMDYyODEzMzIxOFowZTELMAkGA1UEBhMCTEsxEDAOBgNV
BAgMB1dlc3Rlcm4xEDAOBgNVBAcMB0NvbG9tYm8xDTALBgNVBAoMBEFWSVgxDzAN
BgNVBAsMBkNsaWVudDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0B
AQEFAAOCAQ8AMIIBCgKCAQEAq8s322duVHHrAwZJe8mHiyeZyrb3LzQUkuKA+LbN
IiDt+pKWZilrDo4bIbByV/X89/c9GYherEqgsBuyuZHIBAZBNZ2p/VC0eRT5GjhX
bwliMzLcMrAQr3vXy8iIgJvOWN2ABksWVmORaIIlvzhmji47O7J8LePHWqoupCCE
7jkKOk5wvpsZdKUDzFSteis7YkS4rI9+jvaNTensXYlJ80rKr30fSB2RsN3Jgo/c
GXXeovW9a6CID9Fg904g6Uwt5ioVM0owbLUkS8J5mezZQBHUxb4UzX8wC8PUcO3q
qn1JEats+kuYT90qrmVaBdNBEUlSa+T8o6QgyCXJ/h5JGwIDAQABozgwNjA0BgNV
HREELTArgglsb2NhbGhvc3SCBzAuMC4wLjCCCTEyNy4wLjAuMYcEAAAAAIcEfwAA
ATANBgkqhkiG9w0BAQsFAAOCAQEATNh/qOMm71Y0Thjq2hguIaix/3AC5bU2AjKt
OObJf8fNsUoFZwMrg5kRkxBEnHF0tAiMBW1vBBKK93pEAp0K4XBA0mgb1n9LgqOf
oDIgOKKiymm3AqhmBytjDQ1yrQRvVb8eIary0YoCCyYFAqox7hNrLkaV0x42BYJ0
BQRBj1iorJ+tog/2Bw2vt7cFv9Mm1quaiyEgmk1yMoaEtsZqbvIS1xFjmrjM+1iR
mhKa6NyPXiXracKEWo5cjm95QT7SPZ/guctdeijKOrNz/y0+LO/jTWrRozp7lwhp
IBzOaHLWGzR7SWpRRajeqZxhnRo3bzH/EuqP+JWhRz88HXBUPA==
-----END CERTIFICATE-----

server.crt

-----BEGIN CERTIFICATE-----
MIIDgTCCAmmgAwIBAgIJAKD3AgGoknU7MA0GCSqGSIb3DQEBCwUAMGExCzAJBgNV
BAYTAkxLMRAwDgYDVQQIDAdXZXN0ZXJuMRAwDgYDVQQHDAdDb2xvbWJvMQ0wCwYD
VQQKDARBVklYMQswCQYDVQQLDAJDQTESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTIx
MDYzMDEzMzMwNloXDTMxMDYyODEzMzMwNlowZTELMAkGA1UEBhMCTEsxEDAOBgNV
BAgMB1dlc3Rlcm4xEDAOBgNVBAcMB0NvbG9tYm8xDTALBgNVBAoMBEFWSVgxDzAN
BgNVBAsMBlNlcnZlcjESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0B
AQEFAAOCAQ8AMIIBCgKCAQEAvIIidvnwytSjDrjMsmeXtyN7BtHOdSXZ99rqpfog
vVN8nlAy2YGRXk5XK4qZFuZYMDD0J86BTRbHLb8uFlZ8YywScyiM9g+UwcSGr2+Z
35PnB7C/sBJAbPQpK7XCn4v24Tfe5yLRaaJVdg3fmpf1TNe5Sfq+vwBsI9XSkh2A
A8gZupZcDqkVFNpaxwc/9eVpF957tJLn9Qnr9R0wTKUtKrJP1GUZUD2OZhLWpJDA
UNKnUhm1rD4e0CS8z5OtN73NiUi4nmzkOOs0G3IIcz7ZpHU2hk7Y802Xmnbv9RI2
fvPk5BSshDhhw7oqaT7tTed7QTbWb+iOB4UuP9ukZSM8QwIDAQABozgwNjA0BgNV
HREELTArgglsb2NhbGhvc3SCBzAuMC4wLjCCCTEyNy4wLjAuMYcEAAAAAIcEfwAA
ATANBgkqhkiG9w0BAQsFAAOCAQEAWlr/+wq1CuVz8TKXV5pSyFEE2aGxnGPYbAWa
czuYxC0nMMQUZvPULsRixFBO+MQnYHqND3YTAxUW0qJLrb+SZhF60Q/3rlgU4tKb
bolUlaRwgvoJkOnWK777yKoHC+bNTv3NKOsRLB0J5zsjhHZu3AnooTgKmQP3Y6ks
E1gDEhYCZf9+djxzQ0n/PH1xgPXpFSOZwiiUrbxBCmqjID3C3jZsf87zJv+pMFZJ
xCiJ0QCc3D34x+DE1t2XIAc1hwu/XaXGabDVXf/yz3Hq5nEFdSrzDzmVRcBpkJUy
p4KAKYJ9ghXAcgTKAzQAjqhCoEuc1M4OKx9uTKnBZyzpoKGy3g==
-----END CERTIFICATE-----

client.key

-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCryzfbZ25UcesD
Bkl7yYeLJ5nKtvcvNBSS4oD4ts0iIO36kpZmKWsOjhshsHJX9fz39z0ZiF6sSqCw
G7K5kcgEBkE1nan9ULR5FPkaOFdvCWIzMtwysBCve9fLyIiAm85Y3YAGSxZWY5Fo
giW/OGaOLjs7snwt48daqi6kIITuOQo6TnC+mxl0pQPMVK16KztiRLisj36O9o1N
6exdiUnzSsqvfR9IHZGw3cmCj9wZdd6i9b1roIgP0WD3TiDpTC3mKhUzSjBstSRL
wnmZ7NlAEdTFvhTNfzALw9Rw7eqqfUkRq2z6S5hP3SquZVoF00ERSVJr5PyjpCDI
Jcn+HkkbAgMBAAECggEAG5bmYJEK0fr9kB3JXVzvLLTuaPTbW7RUfYZsygSpiwqY
zX4PM1cBf3Po/Y8VvfSebMBDCr+w3KCbj8zS9iRoZ+UNQewRKcJ1SWDJ5KEZwZZT
M/++B1HpTBVqgaqx6V2cVP81mU0jyDnzV0JXh48qQTlBgAPZF5GWKYRdOzSRxKQX
Cd1wDOkvWcsfb4u4JkMFftHDt0F1j8w28FXzJZvLOmGWL1W9QN+Pw5ybB6tVjlcH
62x59r5fdsSVJmz4lPc3oPljvw4ldZrbtyjMNl3cmpAry+L2yETFof2/e+v6kS0N
zwf7QqphWp2YI9ocp6Q0A2iJeUtrMsWMzmfMFk4+OQKBgQDa1QgdAEHMqBTHJoEP
3g9r4XgrfsUnwldJRMlDwUUFxfclYQZM4qR5Sh6wsNOgK86OQhxVB3XmWiKGlI9Q
PBnTycwnx9U4yuKQaIAKHA81F7ypZiaMsyT4lO0yszHSdamP7kWmt6zSG7Ymh8ez
rZSUL/dPTwFmGkTkE+JorUVslQKBgQDI+O2lumDh5lvQr5OOYlAcapwvK3m2bmZt
4dk8BM78DhN9+mGZDVSxSrFpa01XAf9n3QI8mr0+9Gk32+U+0BSwEFOY0DrY2F8V
h3OuPblgVabpEUDT/DPizS0KPa/fXBG/UHLtFqDqCBrJpKCa2I1koDfw03Xa1MFX
m0TNn7HC7wKBgQCuGiFLWYIAPpXRYEv8gNLKJOeDy2c2W8MwUt7B3kl472jTP19Q
2ly69wJ11yl33IKI4gZCjB38OCgR5K4kyrxNqNaZq75zYc9hzxJXeL2MoycBq3ju
JjU65NnWpYT1CZHvJkYZ/zYrL8r62rpCtM5qG8NqQNQ/FJVM1JkkaP68BQKBgDUJ
NDXrJIwY7Zv56bfGjBujqsnr8DfLlK5kzp4/Kpxr3or9W2o3kWUkMAOA9K0EcvFm
6qMxtsV5XJcIKcxUv1DjThxiv1TgbN9Smp2d3mDk2aJiEjifMtYkjUFJnDMWqfxR
77JPqXF5aBBUqKmUgmPOwFSGiXCReuByskqNXSzzAoGBAKAC6xRGW+mgSq/GlUI6
ltbKjkzLkfY3M7eflUHtedLRpID0DRMgQPMCWlUefcK8qb7HvivhryWyZDHTnI/+
1KvgHhfUwFvrGtG7/omf+x+weav5cYevs1+pJt2PJMHFlFHeBJbh2tBCvEv9DgXM
OtjyRXr703tZjlsqHM/MvRnL
-----END PRIVATE KEY-----

server.key

-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC8giJ2+fDK1KMO
uMyyZ5e3I3sG0c51Jdn32uql+iC9U3yeUDLZgZFeTlcripkW5lgwMPQnzoFNFsct
vy4WVnxjLBJzKIz2D5TBxIavb5nfk+cHsL+wEkBs9CkrtcKfi/bhN97nItFpolV2
Dd+al/VM17lJ+r6/AGwj1dKSHYADyBm6llwOqRUU2lrHBz/15WkX3nu0kuf1Cev1
HTBMpS0qsk/UZRlQPY5mEtakkMBQ0qdSGbWsPh7QJLzPk603vc2JSLiebOQ46zQb
cghzPtmkdTaGTtjzTZeadu/1EjZ+8+TkFKyEOGHDuippPu1N53tBNtZv6I4HhS4/
26RlIzxDAgMBAAECggEADV6NYzLGyRvblKe2nNCMCgkSKKGoqtZMZy+3hJ4AxGyv
Yer+MD7oOHAF7qidwuoDRyexqUEUGouLBMuSUTNBDyeJB3HQzYK9YiC2DKs1lo2+
zOTUM4CX4SJP6TGy4+3hTIfR30Kw53UD6qOBQE2WsCR8l6qY7KLzCHeZv5MoCRyA
zH5BLRscqWrqTLCaXl1ucrxBDy6MjFHdcaDNg6E6wawd+CCKEO9SUc4HpFu3a1F3
/XnBCdRZvhMssSL4n89u14a+wjhGBs1fSEv9uelYhE2u1nPz2PTwwb/4Kphkswq6
OlPlH+MyA6VHOKZaHyoSvsx8oY8KpUiZNaTXs8hJAQKBgQDfX1POfvgDS21k0m4C
yW4vvunF0pwDj29elcQeOh6VYLNuUXCA/G4s3WwyD2HBL99uyfHFtZAqmV+UPV1B
TkkdVvvKMRS9OVJdZ6El+0S/Om5jGKtpqBdgojIu6JCPg/4DXOnyO+MP58SRN9Fv
aClNP97mR85rsJhhNyyLN8Q20wKBgQDYCx6M74K1AdkmATd5J6FCJpOQrm751Elh
jwgLOAwwhjPGZmW2piEoMPLKWDFuqJytWq0WGaHcSXIrawQYCn/HKoCUDAVNYhvO
5k6R82bNb2nK1CYsDJmzS/JAr/IMkwF8yc3JfvPWjK/SlnaXlxTgaH2kekuxTX5J
03nt9kte0QKBgHUVUqh6hSW087JVWrS7QLZmK8M1kfGLzaSWfCYb8Mv71BGPPpld
t6JsrbnPtj29CjDYSZKIiPjLS/437Bm/HzPx1Ck3cNd+pF2xZBa0jmRuDE6FL9RB
937/ha5w2NVhKlQ476e5HFvJ8I7t4ymyP3PudrfeC6UG+9h5Qy/2GzNhAoGBAJ0G
p/Pj+eq6Jyy6YamLpmfARi04u6yWIafpH/2XhTSXtnYT4wX4hc0CltV/eJ5nq0lM
yNVxV/YW5Aaux9D5gJowSDhS7F+sWW3uc1kdwrC6s/gaboxBtE1fe/qMZ6/AAarD
uvEjdvOQWhMEDbpss13QVNIBmZaLLqlWLh9JjsqhAoGAWs/VIQ3pxf8X6HHEDkWH
NL9mClNT3xfMTvZsndMUjHge6rbqgrsipGgbiz9UHD7ftCG13bWEJNor7WsucWEK
H40WeHpKCXUJR85Hng4olJ2kbGGVeRJNqIbyCFw9JTRgS6ZMG+U9SacfCV8uGxRt
JnU71wydqJBv9xqPeEyrbQs=
-----END PRIVATE KEY-----