-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor into Writer and Committer #234
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still have to figure out why integration tests are failing in CI (passing on my local machine) but in the meantime,
Some comments to hopefully guide reviewers.
private Writer writer; | ||
private Committer committer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved pretty much all of the logic out of this IcebergSinkTask class into Writer
and Committer
implementations. All we do here now is manage the lifecycle of a Writer
and Committer
.
This will make it easier in the future to introduce a pluggable Committer interface.
public interface Committer { | ||
void commit(CommittableSupplier committableSupplier); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the Committer interface I'm proposing.
I am not making this interface pluggable in this PR, nor do I intend to do so any time soon until the need arises.
This means we can still change it after this PR without having to do a breaking release.
So we don't need to get this interface 100% right here but we should at least try to align directionally on the design of the interface.
As far as the design is concerned, I've taken some inspiration from Flink here (they have a concept of a Committable
as well).
Besides that, I've decided to go super simple with the API for now; just a single void commit(CommittableSupplier committableSupplier)
method.
You might wonder, why not just void commit(Committable committable)
?
The reason is because we want to let Committer
implementations decide when to force the writers to flush and produce a Committable
which by definition means closing all open files. We want to avoid closing files unnecessarily because that will result in many small files. Hence the Committer
takes a CommittableSuppler
and when the Committer
determines it is a good time to commit, it can call CommittableSupplier.committables
to force the Writer
to close any open files and produce a Committable
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of having a pluggable committer interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answered offline.
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) { | ||
processControlEvents(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the flush method entirely.
All the work now happens in put.
public Map<TopicPartition, OffsetAndMetadata> preCommit( | ||
Map<TopicPartition, OffsetAndMetadata> currentOffsets) { | ||
return ImmutableMap.of(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not committing to the connect consumer group via this method anymore, all consumer-offset-commits are managed manually.
KafkaUtils.commitOffsets( | ||
producer, offsetsToCommit, new ConsumerGroupMetadata(config.connectGroupId())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where I commit to the connect-
group now.
Right next to where I commit to the (current) source-of-truth group: control-group-id.
Note that committing to the connect-
group is best-effort (same as before); the connect group is not currently the source of truth.
In a future PR, we will start committing to the connect-
group exclusively (as part of worker zombie fencing) and make that the source of truth, however that is a breaking change which I want to avoid in this PR (we should bundle up breaking changes in another PR).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re; future PR, if we stop committing to connect-
won't we run into the potential for the connect-
consumer group to be deleted by Kafka due to inactivity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you've misunderstood. In the future, we will still be committing to the connect-
consumer group. The only change is that we will be committing to the connect-
group exclusively i.e. I intend to get rid of the connector-managed-consumer-group (but this is all theoretical right now, we can talk about it more when we get there).
public Integer taskId() { | ||
// this is for internal use and is not part of the config definition... | ||
return originalProps.get(INTERNAL_TRANSACTIONAL_SUFFIX_PROP); | ||
return Integer.valueOf(originalProps.get(INTERNAL_TASK_ID)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another change I do not consider breaking as it's clearly documented as internal.
...a-connect/src/main/java/io/tabular/iceberg/connect/channel/CoordinatorThreadFactoryImpl.java
Outdated
Show resolved
Hide resolved
kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkTask.java
Show resolved
Hide resolved
kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java
Show resolved
Hide resolved
// TODO: why putIfAbsent? why not just put? | ||
consumerProps.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand why this is a putIfAbsent
rather than just put
I'd like to know why.
To me, the most correct thing here is to always start from latest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm with you, I can't think of a good reason why you would set this to earliest. All this is doing is leaving it to be user configurable w/ a default to latest. Can live with it if we can't figure out the mystery.
thread.start(); | ||
LOG.info("Started commit coordinator"); | ||
} else { | ||
thread = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this branch result in a case where we have no coordinator thread running?
Are we making the assumption that we are no stable, therefor we are going to be rebalanced, but are assuming we are going to be eventually stable and we'll (eventually) not enter this branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this branch result in a case where we have no coordinator thread running?
Yes, this is what is happening in the code today.
Are we making the assumption that we are no stable, therefor we are going to be rebalanced, but are assuming we are going to be eventually stable and we'll (eventually) not enter this branch?
This is my understanding.
@bryanck would appreciate it you could confirm if this is why you had this logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reasoning for this was to ensure we account for all subscribed topics before ordering for the leader election., e.g. if you have multiple source topics.
kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java
Show resolved
Hide resolved
kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java
Outdated
Show resolved
Hide resolved
kafka-connect/src/main/java/io/tabular/iceberg/connect/data/WriterImpl.java
Outdated
Show resolved
Hide resolved
kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java
Show resolved
Hide resolved
Mostly questions. The main thrust of this is 👍 . |
@bryanck here's the refactor PR I mentioned earlier to enable a pluggable committer interface. |
e82981c
to
375bc15
Compare
375bc15
to
1bd2488
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM from my end.
Changes summary:
There should be no breaking changes in this PR.