Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer offsets are not getting translated to destination #2

Open
shravangit20 opened this issue Aug 16, 2020 · 5 comments
Open

Consumer offsets are not getting translated to destination #2

shravangit20 opened this issue Aug 16, 2020 · 5 comments

Comments

@shravangit20
Copy link

We have run the mirrormaker2 on a kafka connect framework as per the instructions and notice the topics/data migrated from source to destination.
However the offsets on the destination seem to be started from "0" and not translated from the source. This would leave all clients to consume the offsets from the beginning after switching to MSK. I am assuming some issue with the consumer group configuration but not able to identify what is missing.

Below are the configurations used:
mm2-msc-cust-repl-policy:
{
"name": "mm2-msc",
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"clusters": "msksource,mskdest",
"source.cluster.alias": "msksource",
"target.cluster.alias": "mskdest",
"target.cluster.bootstrap.servers": "Destination MSK bootstraps",
"source.cluster.bootstrap.servers": "Source kafka bootstraps",
"topics": ".*",
"tasks.max": "4",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"replication.policy.class": "com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy",
"replication.factor": "3",
"offset-syncs.topic.replication.factor": "3",
"sync.topic.configs.interval.seconds": "20",
"refresh.topics.interval.seconds": "20",
"refresh.groups.interval.seconds": "20",
"consumer.group.id": "preprod-cgi",
"producer.enable.idempotence":"true",
"sync.topic.acls.enabled": "false"
}
The consumer.group.id specified here was not created by mirrormaker2, we used the same consumer.group.id while running the Group offset sync application and it could not identify the consumer group. Our understanding is this is the consumer group created for mirror maker consumer and after the mirror maker producer writes the data to MSK these offsets with respect the "preprod-cgi" will be translated in destination.

java -jar /tmp/kafka/MM2GroupOffsetSync-1.0-SNAPSHOT.jar -cgi preprod-cgi -src msksource-ps -pfp /tmp/kafka/consumer.properties -rpc com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy 2>&1 > /tmp/preprod-v4.log &

mm2-cpc-cust-repl-policy:
{

"target.cluster.bootstrap.servers": "Destination MSK bootstraps",
"source.cluster.bootstrap.servers": "Source kafka bootstraps",
"tasks.max": "1",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"replication.policy.class": "com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy",
"replication.factor": "3",
"checkpoints.topic.replication.factor": "3",
"emit.checkpoints.interval.seconds": "20"

}

Also while starting the kafka connect service, it uses a connect-distributed.properties file which has a group.id in it. Does this have any particular significance or can create any conflicts with the above consumer group id from mm2-msc-cust-repl-policy:??

Reference for source vs destination offsets post migration:
Source:
agent-payloads:0:235943806
agent-payloads:1:0
agent-payloads:2:0
agent-payloads:3:0
agent-payloads:4:0

Destination:
agent-payloads:0:24483600
agent-payloads:1:0
agent-payloads:2:0
agent-payloads:3:0
agent-payloads:4:0

Thanks and appreciate any help on this.

Thanks,
Shravan

@shravangit20
Copy link
Author

@rcchakr It would be great if you can share your thoughts on the above situation. Thanks in advance.

@rcchakr
Copy link
Contributor

rcchakr commented Aug 17, 2020

Hi @shravangit20 The consumer group id for the MM2GroupOffsetSync application (for the -cgi parameter) should be the consumer group id for the consumer group you're trying to migrate from the source to the destination cluster and not the consumer group id for MM2 or the underlying Kafka Connect cluster. So, if you have a consumer running in the source cluster with a group.id of consumer1 against topic ExampleTopic which you're migrating to the destination cluster, you would specify that group, since that is the group that is being checkpointed to the destination cluster internal Checkpoint topic for MM2 which the MM2GroupOffsetSync application reads and sync to the destination cluster's __consumer_offsets internal topic.

Hope this helps.

@shravangit20
Copy link
Author

Thank you @rcchakr for clarifying. Really appreciate your timely response.So if I understand correctly I need to use the existing "source consumer group id"while running the MM2GroupOffsetSync application? We have about 20+ consumer groups on the source so after running all connectors we need to run the MM2GroupOffset Application for each of the consumer groups correct?
Also, as per the AWS Lab documentation the consumer.properties file used by the MM2GroupOffset has the destination bootstrap severs only, there is no source bootstrap servers picked by the MM2GroupOffset except for the consumer groups we are passing which belong to source correct?
One last question: Upon running the MM2GroupOffset with the source consumer group, the same consumer group will be mirrored on to the destination correct?
Thanks,
Shravan

@rcchakr
Copy link
Contributor

rcchakr commented Aug 17, 2020

Hi @shravangit20 Yes, you would have to run a separate instance of the app for each of the consumer groups or you could modify the code to run it for all consumer groups ids. Also, yes you only need the destination bootstrap brokers as this runs entirely in the destination cluster and syncs the checkpoint topic and the _cconsumer_offsets topic in the destination cluster. MM2 does the work of syncing the source consumer group ids and offsets from the source to the destination checkpoints topic.
The answer to your last question is yes.

@shravangit20
Copy link
Author

Awesome. We will try another run. Thank you!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants