-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpg_replication_client.rb
152 lines (130 loc) · 4.27 KB
/
pg_replication_client.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# frozen_string_literal: true
require 'pg'
require 'logger'
require 'nio'
require 'concurrent'
require 'pry'
require_relative 'conn_utils'
require_relative 'decoder'
require_relative 'encoder'
require_relative 'replication_state'
Thread.abort_on_exception = true
class PgReplicationClient
DEFAULT_CONF = {
host: 'localhost',
port: 5432,
user: 'postgres',
password: nil,
dbname: 'postgres',
slotname: 'pg_logical_test', # replication slot name
status_interval: 10, # interval to sent status updates to server, in seconds
plugin: 'wal2json', # server-side WAL decoder plugin
plugin_opts: %q(("include-types" 'false', "pretty-print" 'true')),
create_slot: true # create slot on startup
}.freeze
DEFAULT_LOGGER = Logger.new(STDOUT)
DEFAULT_LOGGER.level = Logger::INFO
attr_reader :logger, :conf, :thread, :conn
def initialize(callback: ->(message) { logger.info message }, logger: DEFAULT_LOGGER, conf: DEFAULT_CONF)
@conf = OpenStruct.new(DEFAULT_CONF.merge(conf))
@logger = logger
@callback = callback
@conn_utils = ConnUtils.new(conf: @conf, logger: @logger)
@write_lock = Mutex.new
logger.info "host=#{@conf.host} dbname=#{@conf.dbname} port=#{@conf.port} user=#{@conf.user} " \
"slotname=#{@conf.slotname} status_interval=#{@conf.status_interval} (seconds)"
end
def start
@thread = Thread.new(&method(:stream_log))
end
def shutdown
unless @conn.nil?
@conn.put_copy_end
@conn.flush
end
@state_update_task&.shutdown
Thread.kill(@thread) if @thread
end
private
def stream_log
@conn = @conn_utils.connect
@conn_utils.create_replication_slot if @conf.create_slot
@replication_state = @conn_utils.start_streaming
@state_update_task = install_state_update_task
wait_for_message
rescue StandardError => e
logger.warn "pg-logical: (stream_log) #{e}"
logger.warn e.backtrace.join("\n")
logger.warn 'retrying in 5 seconds...'
sleep 5
retry
ensure
@conn&.finish
end
def wait_for_message
selector = NIO::Selector.new
reader, _writer = @conn.socket_io
monitor = selector.register(reader, :r)
monitor.value = proc { decode }
loop do
selector.select { |mon| mon.value.call(mon) }
end
end
def install_state_update_task
task = Concurrent::TimerTask.new(execution_interval: @conf.status_interval, timeout_interval: 2) do
send_standby_status_update
end
task.add_observer(StatusUpdateObserver.new(logger: @logger))
task.execute
task
end
def decode
data = @conn.get_copy_data(false, nil)
message = Decoder.decode(data)
update_replication_state(message)
reply(message)
@conn.consume_input
if message.is_a?(Decoder::XLogData)
@callback.call(message)
elsif message.is_a?(Decoder::Keepalive)
logger.debug "Received a Keepalive: #{message}"
end
end
def reply(message)
send_standby_status_update if message.is_a?(Decoder::Keepalive) && message.requires_reply
end
def update_replication_state(message)
return unless @replication_state.curr_lsn.nil? || (message.end_lsn > @replication_state.curr_lsn)
@replication_state.curr_lsn = message.end_lsn
end
def send_standby_status_update
now = Time.now
message = Encoder.build_status_update(@replication_state.curr_lsn, now)
send(message)
@replication_state.tap { |rs| rs.last_sent_at = now }
end
def send(message)
# It's possible to receive a keepalive with the `reply_required` flag set,
# in which case we reply immediately from the main thread. Therefore
# we need a lock here to avoid writing at the same time with the `state_update_task`
@write_lock.synchronize do
@conn.flush
raise 'Could not send reply' unless @conn.put_copy_data(message)
end
end
class StatusUpdateObserver
attr_reader :logger
def initialize(logger:)
@logger = logger
end
def update(_time, result, exception)
if result
logger.debug "#{self.class} Sent standby status update. Replication state is now: #{result.inspect}"
elsif exception.is_a?(Concurrent::TimeoutError)
logger.warn "#{self.class} Execution timed out"
else
logger.error "#{self.class} Execution failed with error #{exception}"
end
end
end
end