# File lib/fluent/output.rb, line 180 def initialize super @next_flush_time = 0 @last_retry_time = 0 @next_retry_time = 0 @num_errors = 0 @num_errors_lock = Mutex.new @secondary_limit = 8 @emit_count = 0 end
# File lib/fluent/output.rb, line 411 def before_shutdown begin @buffer.before_shutdown(self) rescue $log.warn "before_shutdown failed", error: $!.to_s $log.warn_backtrace end end
# File lib/fluent/output.rb, line 420 def calc_retry_wait # TODO retry pattern wait = if @disable_retry_limit || @num_errors <= @retry_limit @retry_wait * (2 ** (@num_errors - 1)) else # secondary retry @retry_wait * (2 ** (@num_errors - 2 - @retry_limit)) end retry_wait = wait.finite? ? wait + (rand * (wait / 4.0) - (wait / 8.0)) : wait @max_retry_wait ? [retry_wait, @max_retry_wait].min : retry_wait end
# File lib/fluent/output.rb, line 209 def configure(conf) super @retry_wait = @retry_wait.to_f # converted to Float for calc_retry_wait @buffer = Plugin.new_buffer(@buffer_type) @buffer.configure(conf) if @buffer.respond_to?(:enable_parallel) if @num_threads == 1 @buffer.enable_parallel(false) else @buffer.enable_parallel(true) end end @writers = (1..@num_threads).map { writer = OutputThread.new(self) writer.configure(conf) writer } if sconf = conf.elements.select {|e| e.name == 'secondary' }.first type = sconf['@type'] || conf['@type'] || sconf['type'] || conf['type'] @secondary = Plugin.new_output(type) @secondary.router = router @secondary.configure(sconf) if secondary_limit = conf['secondary_limit'] @secondary_limit = secondary_limit.to_i if @secondary_limit < 0 raise ConfigError, "invalid parameter 'secondary_limit #{secondary_limit}'" end end @secondary.secondary_init(self) end Status.register(self, "queue_size") { @buffer.queue_size } Status.register(self, "emit_count") { @emit_count } end
# File lib/fluent/output.rb, line 265 def emit(tag, es, chain, key="") @emit_count += 1 data = format_stream(tag, es) if @buffer.emit(key, data, chain) submit_flush end end
def write(chunk) end
# File lib/fluent/output.rb, line 293 def enqueue_buffer(force = false) @buffer.keys.each {|key| @buffer.push(key) } end
# File lib/fluent/output.rb, line 442 def flush_secondary(secondary) @buffer.pop(secondary) end
# File lib/fluent/output.rb, line 403 def force_flush @num_errors_lock.synchronize do @next_retry_time = Time.now.to_f - 1 end enqueue_buffer(true) submit_flush end
# File lib/fluent/output.rb, line 279 def format_stream(tag, es) out = '' es.each {|time,record| out << format(tag, time, record) } out end
# File lib/fluent/output.rb, line 259 def shutdown @writers.each {|writer| writer.shutdown } @secondary.shutdown if @secondary @buffer.shutdown end
# File lib/fluent/output.rb, line 250 def start @next_flush_time = Time.now.to_f + @flush_interval @buffer.start @secondary.start if @secondary @writers.each {|writer| writer.start } @writer_current_position = 0 @writers_size = @writers.size end
# File lib/fluent/output.rb, line 273 def submit_flush # Without locks: it is rough but enough to select "next" writer selection @writer_current_position = (@writer_current_position + 1) % @writers_size @writers[@writer_current_position].submit_flush end
# File lib/fluent/output.rb, line 299 def try_flush time = Time.now.to_f empty = @buffer.queue_size == 0 if empty && @next_flush_time < (now = Time.now.to_f) @buffer.synchronize do if @next_flush_time < now enqueue_buffer @next_flush_time = now + @flush_interval empty = @buffer.queue_size == 0 end end end if empty return time + @try_flush_interval end begin retrying = !@num_errors.zero? if retrying @num_errors_lock.synchronize do if retrying = !@num_errors.zero? # re-check in synchronize if @next_retry_time >= time # allow retrying for only one thread return time + @try_flush_interval end # assume next retry failes and # clear them if when it succeeds @last_retry_time = time @num_errors += 1 @next_retry_time += calc_retry_wait end end end if @secondary && !@disable_retry_limit && @num_errors > @retry_limit has_next = flush_secondary(@secondary) else has_next = @buffer.pop(self) end # success if retrying @num_errors = 0 # Note: don't notify to other threads to prevent # burst to recovered server $log.warn "retry succeeded.", plugin_id: plugin_id end if has_next return Time.now.to_f + @queued_chunk_flush_interval else return time + @try_flush_interval end rescue => e if retrying error_count = @num_errors else # first error error_count = 0 @num_errors_lock.synchronize do if @num_errors.zero? @last_retry_time = time @num_errors += 1 @next_retry_time = time + calc_retry_wait end end end if @disable_retry_limit || error_count < @retry_limit $log.warn "temporarily failed to flush the buffer.", next_retry: Time.at(@next_retry_time), error_class: e.class.to_s, error: e.to_s, plugin_id: plugin_id $log.warn_backtrace e.backtrace elsif @secondary if error_count == @retry_limit $log.warn "failed to flush the buffer.", error_class: e.class.to_s, error: e.to_s, plugin_id: plugin_id $log.warn "retry count exceededs limit. falling back to secondary output." $log.warn_backtrace e.backtrace retry # retry immediately elsif error_count <= @retry_limit + @secondary_limit $log.warn "failed to flush the buffer, next retry will be with secondary output.", next_retry: Time.at(@next_retry_time), error_class: e.class.to_s, error: e.to_s, plugin_id: plugin_id $log.warn_backtrace e.backtrace else $log.warn "failed to flush the buffer.", error_class: e.class, error: e.to_s, plugin_id: plugin_id $log.warn "secondary retry count exceededs limit." $log.warn_backtrace e.backtrace write_abort @num_errors = 0 end else $log.warn "failed to flush the buffer.", error_class: e.class.to_s, error: e.to_s, plugin_id: plugin_id $log.warn "retry count exceededs limit." $log.warn_backtrace e.backtrace write_abort @num_errors = 0 end return @next_retry_time end end
# File lib/fluent/output.rb, line 432 def write_abort $log.error "throwing away old logs." begin @buffer.clear! rescue $log.error "unexpected error while aborting", error: $!.to_s $log.error_backtrace end end