Kafka
Kafka Server
docker-compose.yml
version: '2'
services:
zookeeper:
image: 'confluentinc/cp-zookeeper:latest'
hostname: zookeeper
container_name: kafka-zookeeper
ports:
- '2181:2181'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: INFO
KAFKA_OPTS: '-Djava.security.auth.login.config=/opt/confluentinc/kafka/config/zookeeper_jaas.conf'
volumes:
- ./zookeeper_jaas.conf:/opt/confluentinc/kafka/config/zookeeper_jaas.conf
broker:
image: 'confluentinc/cp-server:latest'
hostname: broker
container_name: kafka-broker
depends_on:
- zookeeper
ports:
- '9092:9092'
- '9093:9093'
- '9094:9094'
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: PLAINTEXT
KAFKA_SSL_CLIENT_AUTH: "required"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,EX_PLAINTEXT://localhost:9092,EX_SASL_PLAINTEXT://localhost:9093,EX_SSL://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EX_PLAINTEXT:PLAINTEXT,EX_SASL_PLAINTEXT:SASL_PLAINTEXT,EX_SSL:SSL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_SSL_KEYSTORE_FILENAME: kafka.broker.keystore.jks
KAFKA_SSL_KEYSTORE_CREDENTIALS: broker_keystore_credentials
KAFKA_SSL_KEY_CREDENTIALS: broker_ssl_key_credentials
KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.broker.truststore.jks
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: broker_truststore_credentials
KAFKA_LOG4J_ROOT_LOGLEVEL: INFO
KAFKA_OPTS: '-Djava.security.auth.login.config=/opt/confluentinc/kafka/config/kafka_jaas.conf'
volumes:
- ./secrets/broker_keystore_credentials:/etc/kafka/secrets/broker_keystore_credentials
- ./secrets/broker_ssl_key_credentials:/etc/kafka/secrets/broker_ssl_key_credentials
- ./secrets/broker_truststore_credentials:/etc/kafka/secrets/broker_truststore_credentials
- ./keystore/keystore.jks:/etc/kafka/secrets/kafka.broker.keystore.jks
- ./keystore/truststore.jks:/etc/kafka/secrets/kafka.broker.truststore.jks
- ./kafka_jaas.conf:/opt/confluentinc/kafka/config/kafka_jaas.conf
kafka_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="test-service"
username="alice"
password="alice@123"
user_alice="alice@123";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="alice"
password="alice@123";
};
zookeeper_jaas.conf
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="alice"
password="alice@123"
user_alice="alice@123";
};
$ docker-compose -f docker-compose.yml up
Consumer
kafka_consumer_sasl_plain_auth.bal
import ballerina/io;
import ballerinax/kafka;
const string TOPIC = "demo-security";
listener kafka:Listener securedEP = new ("localhost:9094",
topics = [TOPIC],
groupId = "consumer-group-1",
auth = {
mechanism: kafka:AUTH_SASL_PLAIN,
username: "alice",
password: "alice@123"
},
securityProtocol = kafka:PROTOCOL_SSL,
secureSocket = {
cert: "./resources/cert/server.crt",
key: {
certFile: "./resources/cert/client.crt",
keyFile: "./resources/key/client.key"
},
protocol: {
name: kafka:TLS
}
}
);
isolated service kafka:Service on securedEP {
isolated remote function onConsumerRecord(kafka:Caller caller, kafka:ConsumerRecord[] records) returns error? {
io:println("Received message: ", string:fromBytes(records.pop().value));
check caller->commit();
}
}
$ bal run kafka_consumer_sasl_plain_auth.bal
Producer
kafka_producer_sasl_plain_auth.bal
import ballerina/io;
import ballerinax/kafka;
const string TOPIC = "demo-security";
const string MESSAGE = "Hello, World!";
final kafka:Producer producer = check new ("localhost:9094",
auth = {
mechanism: kafka:AUTH_SASL_PLAIN,
username: "alice",
password: "alice@123"
},
securityProtocol = kafka:PROTOCOL_SSL,
secureSocket = {
cert: "./resources/cert/server.crt",
key: {
certFile: "./resources/cert/client.crt",
keyFile: "./resources/key/client.key"
},
protocol: {
name: kafka:TLS
}
}
);
public function main() returns error? {
check producer->send({ topic: TOPIC, value: MESSAGE.toBytes() });
check producer->'flush();
io:println("Message published successfully.");
}
$ bal run kafka_producer_sasl_plain_auth.bal
Resources
ca.crt
-----BEGIN CERTIFICATE-----
MIIDPjCCAiYCCQDiEWXPiJfzKjANBgkqhkiG9w0BAQsFADBhMQswCQYDVQQGEwJM
SzEQMA4GA1UECAwHV2VzdGVybjEQMA4GA1UEBwwHQ29sb21ibzENMAsGA1UECgwE
QVZJWDELMAkGA1UECwwCQ0ExEjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0yMTA2MzAx
MjU2NTBaFw0zMTA2MjgxMjU2NTBaMGExCzAJBgNVBAYTAkxLMRAwDgYDVQQIDAdX
ZXN0ZXJuMRAwDgYDVQQHDAdDb2xvbWJvMQ0wCwYDVQQKDARBVklYMQswCQYDVQQL
DAJDQTESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A
MIIBCgKCAQEA0SlFT4SbEx2zGyzPTrdJSHOPdCEd7gsy6nheyIsjHECngqVVKYwH
Jq4c3RByWJfggvy/NP+mSqi7zCn+lZY1i39TpBmYxF/rhFYIEFNDB8d8pgvZ+0AR
3eB0baaYkkKsMokTjnX43AUm1i+nMTWTTy6thIvabEyT+YgdvoFEPOzQ9UsyeaQ2
bhryuZtQFYi4k3FjSONr8eKxdpjNjOgY03yi8BRW69e+s9Sa8jOMGhhphRpPeuSf
mJMEiRMtBlHsl1QkBRJs/l/rhr9M56CkDEsevG9EQaYsvW7hb1ChU13PksJ9uFWV
mclkwxFFvGXIwV3DJqR9qe/o43usO443XQIDAQABMA0GCSqGSIb3DQEBCwUAA4IB
AQCwzA0H0FCCIuO09gMrdtZIglbsGLRRRqmfXXRWNg63vOUSqSy50JucZtijRMTA
6lKDG1xZopBtI2G5P2SVDmVgwBiJYFxWGMlkF3k6Urid3HO1fQ6ljtmx21kOCtK+
0t++t2oRSpL1qXZkaTK29H4ExpHjlaxQSi9dYvnnPa/1RiIgjzUVt4SSwSI4x8LH
KLll0aRYel/nth7+K8Ld5v56isIv1nwMCG/V/iiN60Eeho9VxXcTemWCgwsEpy0J
arX2HPHvHXGV750FS9iyPNVDRpG60XsBb0Rq3sVPfJ7yEJ8TMgGRe1ab7aE6CEnl
hMhVGtBdnjJNOHcOaGPDZJru
-----END CERTIFICATE-----
client.crt
-----BEGIN CERTIFICATE-----
MIIDgTCCAmmgAwIBAgIJAKD3AgGoknU6MA0GCSqGSIb3DQEBCwUAMGExCzAJBgNV
BAYTAkxLMRAwDgYDVQQIDAdXZXN0ZXJuMRAwDgYDVQQHDAdDb2xvbWJvMQ0wCwYD
VQQKDARBVklYMQswCQYDVQQLDAJDQTESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTIx
MDYzMDEzMzIxOFoXDTMxMDYyODEzMzIxOFowZTELMAkGA1UEBhMCTEsxEDAOBgNV
BAgMB1dlc3Rlcm4xEDAOBgNVBAcMB0NvbG9tYm8xDTALBgNVBAoMBEFWSVgxDzAN
BgNVBAsMBkNsaWVudDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0B
AQEFAAOCAQ8AMIIBCgKCAQEAq8s322duVHHrAwZJe8mHiyeZyrb3LzQUkuKA+LbN
IiDt+pKWZilrDo4bIbByV/X89/c9GYherEqgsBuyuZHIBAZBNZ2p/VC0eRT5GjhX
bwliMzLcMrAQr3vXy8iIgJvOWN2ABksWVmORaIIlvzhmji47O7J8LePHWqoupCCE
7jkKOk5wvpsZdKUDzFSteis7YkS4rI9+jvaNTensXYlJ80rKr30fSB2RsN3Jgo/c
GXXeovW9a6CID9Fg904g6Uwt5ioVM0owbLUkS8J5mezZQBHUxb4UzX8wC8PUcO3q
qn1JEats+kuYT90qrmVaBdNBEUlSa+T8o6QgyCXJ/h5JGwIDAQABozgwNjA0BgNV
HREELTArgglsb2NhbGhvc3SCBzAuMC4wLjCCCTEyNy4wLjAuMYcEAAAAAIcEfwAA
ATANBgkqhkiG9w0BAQsFAAOCAQEATNh/qOMm71Y0Thjq2hguIaix/3AC5bU2AjKt
OObJf8fNsUoFZwMrg5kRkxBEnHF0tAiMBW1vBBKK93pEAp0K4XBA0mgb1n9LgqOf
oDIgOKKiymm3AqhmBytjDQ1yrQRvVb8eIary0YoCCyYFAqox7hNrLkaV0x42BYJ0
BQRBj1iorJ+tog/2Bw2vt7cFv9Mm1quaiyEgmk1yMoaEtsZqbvIS1xFjmrjM+1iR
mhKa6NyPXiXracKEWo5cjm95QT7SPZ/guctdeijKOrNz/y0+LO/jTWrRozp7lwhp
IBzOaHLWGzR7SWpRRajeqZxhnRo3bzH/EuqP+JWhRz88HXBUPA==
-----END CERTIFICATE-----
server.crt
-----BEGIN CERTIFICATE-----
MIIDgTCCAmmgAwIBAgIJAKD3AgGoknU7MA0GCSqGSIb3DQEBCwUAMGExCzAJBgNV
BAYTAkxLMRAwDgYDVQQIDAdXZXN0ZXJuMRAwDgYDVQQHDAdDb2xvbWJvMQ0wCwYD
VQQKDARBVklYMQswCQYDVQQLDAJDQTESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTIx
MDYzMDEzMzMwNloXDTMxMDYyODEzMzMwNlowZTELMAkGA1UEBhMCTEsxEDAOBgNV
BAgMB1dlc3Rlcm4xEDAOBgNVBAcMB0NvbG9tYm8xDTALBgNVBAoMBEFWSVgxDzAN
BgNVBAsMBlNlcnZlcjESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0B
AQEFAAOCAQ8AMIIBCgKCAQEAvIIidvnwytSjDrjMsmeXtyN7BtHOdSXZ99rqpfog
vVN8nlAy2YGRXk5XK4qZFuZYMDD0J86BTRbHLb8uFlZ8YywScyiM9g+UwcSGr2+Z
35PnB7C/sBJAbPQpK7XCn4v24Tfe5yLRaaJVdg3fmpf1TNe5Sfq+vwBsI9XSkh2A
A8gZupZcDqkVFNpaxwc/9eVpF957tJLn9Qnr9R0wTKUtKrJP1GUZUD2OZhLWpJDA
UNKnUhm1rD4e0CS8z5OtN73NiUi4nmzkOOs0G3IIcz7ZpHU2hk7Y802Xmnbv9RI2
fvPk5BSshDhhw7oqaT7tTed7QTbWb+iOB4UuP9ukZSM8QwIDAQABozgwNjA0BgNV
HREELTArgglsb2NhbGhvc3SCBzAuMC4wLjCCCTEyNy4wLjAuMYcEAAAAAIcEfwAA
ATANBgkqhkiG9w0BAQsFAAOCAQEAWlr/+wq1CuVz8TKXV5pSyFEE2aGxnGPYbAWa
czuYxC0nMMQUZvPULsRixFBO+MQnYHqND3YTAxUW0qJLrb+SZhF60Q/3rlgU4tKb
bolUlaRwgvoJkOnWK777yKoHC+bNTv3NKOsRLB0J5zsjhHZu3AnooTgKmQP3Y6ks
E1gDEhYCZf9+djxzQ0n/PH1xgPXpFSOZwiiUrbxBCmqjID3C3jZsf87zJv+pMFZJ
xCiJ0QCc3D34x+DE1t2XIAc1hwu/XaXGabDVXf/yz3Hq5nEFdSrzDzmVRcBpkJUy
p4KAKYJ9ghXAcgTKAzQAjqhCoEuc1M4OKx9uTKnBZyzpoKGy3g==
-----END CERTIFICATE-----
client.key
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCryzfbZ25UcesD
Bkl7yYeLJ5nKtvcvNBSS4oD4ts0iIO36kpZmKWsOjhshsHJX9fz39z0ZiF6sSqCw
G7K5kcgEBkE1nan9ULR5FPkaOFdvCWIzMtwysBCve9fLyIiAm85Y3YAGSxZWY5Fo
giW/OGaOLjs7snwt48daqi6kIITuOQo6TnC+mxl0pQPMVK16KztiRLisj36O9o1N
6exdiUnzSsqvfR9IHZGw3cmCj9wZdd6i9b1roIgP0WD3TiDpTC3mKhUzSjBstSRL
wnmZ7NlAEdTFvhTNfzALw9Rw7eqqfUkRq2z6S5hP3SquZVoF00ERSVJr5PyjpCDI
Jcn+HkkbAgMBAAECggEAG5bmYJEK0fr9kB3JXVzvLLTuaPTbW7RUfYZsygSpiwqY
zX4PM1cBf3Po/Y8VvfSebMBDCr+w3KCbj8zS9iRoZ+UNQewRKcJ1SWDJ5KEZwZZT
M/++B1HpTBVqgaqx6V2cVP81mU0jyDnzV0JXh48qQTlBgAPZF5GWKYRdOzSRxKQX
Cd1wDOkvWcsfb4u4JkMFftHDt0F1j8w28FXzJZvLOmGWL1W9QN+Pw5ybB6tVjlcH
62x59r5fdsSVJmz4lPc3oPljvw4ldZrbtyjMNl3cmpAry+L2yETFof2/e+v6kS0N
zwf7QqphWp2YI9ocp6Q0A2iJeUtrMsWMzmfMFk4+OQKBgQDa1QgdAEHMqBTHJoEP
3g9r4XgrfsUnwldJRMlDwUUFxfclYQZM4qR5Sh6wsNOgK86OQhxVB3XmWiKGlI9Q
PBnTycwnx9U4yuKQaIAKHA81F7ypZiaMsyT4lO0yszHSdamP7kWmt6zSG7Ymh8ez
rZSUL/dPTwFmGkTkE+JorUVslQKBgQDI+O2lumDh5lvQr5OOYlAcapwvK3m2bmZt
4dk8BM78DhN9+mGZDVSxSrFpa01XAf9n3QI8mr0+9Gk32+U+0BSwEFOY0DrY2F8V
h3OuPblgVabpEUDT/DPizS0KPa/fXBG/UHLtFqDqCBrJpKCa2I1koDfw03Xa1MFX
m0TNn7HC7wKBgQCuGiFLWYIAPpXRYEv8gNLKJOeDy2c2W8MwUt7B3kl472jTP19Q
2ly69wJ11yl33IKI4gZCjB38OCgR5K4kyrxNqNaZq75zYc9hzxJXeL2MoycBq3ju
JjU65NnWpYT1CZHvJkYZ/zYrL8r62rpCtM5qG8NqQNQ/FJVM1JkkaP68BQKBgDUJ
NDXrJIwY7Zv56bfGjBujqsnr8DfLlK5kzp4/Kpxr3or9W2o3kWUkMAOA9K0EcvFm
6qMxtsV5XJcIKcxUv1DjThxiv1TgbN9Smp2d3mDk2aJiEjifMtYkjUFJnDMWqfxR
77JPqXF5aBBUqKmUgmPOwFSGiXCReuByskqNXSzzAoGBAKAC6xRGW+mgSq/GlUI6
ltbKjkzLkfY3M7eflUHtedLRpID0DRMgQPMCWlUefcK8qb7HvivhryWyZDHTnI/+
1KvgHhfUwFvrGtG7/omf+x+weav5cYevs1+pJt2PJMHFlFHeBJbh2tBCvEv9DgXM
OtjyRXr703tZjlsqHM/MvRnL
-----END PRIVATE KEY-----
server.key
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC8giJ2+fDK1KMO
uMyyZ5e3I3sG0c51Jdn32uql+iC9U3yeUDLZgZFeTlcripkW5lgwMPQnzoFNFsct
vy4WVnxjLBJzKIz2D5TBxIavb5nfk+cHsL+wEkBs9CkrtcKfi/bhN97nItFpolV2
Dd+al/VM17lJ+r6/AGwj1dKSHYADyBm6llwOqRUU2lrHBz/15WkX3nu0kuf1Cev1
HTBMpS0qsk/UZRlQPY5mEtakkMBQ0qdSGbWsPh7QJLzPk603vc2JSLiebOQ46zQb
cghzPtmkdTaGTtjzTZeadu/1EjZ+8+TkFKyEOGHDuippPu1N53tBNtZv6I4HhS4/
26RlIzxDAgMBAAECggEADV6NYzLGyRvblKe2nNCMCgkSKKGoqtZMZy+3hJ4AxGyv
Yer+MD7oOHAF7qidwuoDRyexqUEUGouLBMuSUTNBDyeJB3HQzYK9YiC2DKs1lo2+
zOTUM4CX4SJP6TGy4+3hTIfR30Kw53UD6qOBQE2WsCR8l6qY7KLzCHeZv5MoCRyA
zH5BLRscqWrqTLCaXl1ucrxBDy6MjFHdcaDNg6E6wawd+CCKEO9SUc4HpFu3a1F3
/XnBCdRZvhMssSL4n89u14a+wjhGBs1fSEv9uelYhE2u1nPz2PTwwb/4Kphkswq6
OlPlH+MyA6VHOKZaHyoSvsx8oY8KpUiZNaTXs8hJAQKBgQDfX1POfvgDS21k0m4C
yW4vvunF0pwDj29elcQeOh6VYLNuUXCA/G4s3WwyD2HBL99uyfHFtZAqmV+UPV1B
TkkdVvvKMRS9OVJdZ6El+0S/Om5jGKtpqBdgojIu6JCPg/4DXOnyO+MP58SRN9Fv
aClNP97mR85rsJhhNyyLN8Q20wKBgQDYCx6M74K1AdkmATd5J6FCJpOQrm751Elh
jwgLOAwwhjPGZmW2piEoMPLKWDFuqJytWq0WGaHcSXIrawQYCn/HKoCUDAVNYhvO
5k6R82bNb2nK1CYsDJmzS/JAr/IMkwF8yc3JfvPWjK/SlnaXlxTgaH2kekuxTX5J
03nt9kte0QKBgHUVUqh6hSW087JVWrS7QLZmK8M1kfGLzaSWfCYb8Mv71BGPPpld
t6JsrbnPtj29CjDYSZKIiPjLS/437Bm/HzPx1Ck3cNd+pF2xZBa0jmRuDE6FL9RB
937/ha5w2NVhKlQ476e5HFvJ8I7t4ymyP3PudrfeC6UG+9h5Qy/2GzNhAoGBAJ0G
p/Pj+eq6Jyy6YamLpmfARi04u6yWIafpH/2XhTSXtnYT4wX4hc0CltV/eJ5nq0lM
yNVxV/YW5Aaux9D5gJowSDhS7F+sWW3uc1kdwrC6s/gaboxBtE1fe/qMZ6/AAarD
uvEjdvOQWhMEDbpss13QVNIBmZaLLqlWLh9JjsqhAoGAWs/VIQ3pxf8X6HHEDkWH
NL9mClNT3xfMTvZsndMUjHge6rbqgrsipGgbiz9UHD7ftCG13bWEJNor7WsucWEK
H40WeHpKCXUJR85Hng4olJ2kbGGVeRJNqIbyCFw9JTRgS6ZMG+U9SacfCV8uGxRt
JnU71wydqJBv9xqPeEyrbQs=
-----END PRIVATE KEY-----