Fluentd forms a tree structure to manage plugins:
RootAgent | +------------+-------------+-------------+ | | | | <label> <source> <filter> <match> | +----+----+ | | <filter> <match>
Relation:
RootAgent has many <label>, <source>, <filter> and <match>
<label> has many <match> and <filter>
Next step: `fluentd/agent.rb` Next step: 'fluentd/label.rb'
# File lib/fluent/root_agent.rb, line 50 def initialize(system_config = SystemConfig.new) super @labels = {} @inputs = [] @started_inputs = [] @suppress_emit_error_log_interval = 0 @next_emit_error_log_time = nil suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil? @without_source = system_config.without_source unless system_config.without_source.nil? end
# File lib/fluent/root_agent.rb, line 160 def add_label(name) label = Label.new(name) label.root_agent = self @labels[name] = label end
# File lib/fluent/root_agent.rb, line 146 def add_source(type, conf) log.info "adding source", type: type input = Plugin.new_input(type) # <source> emits events to the top-level event router (RootAgent#event_router). # Input#configure overwrites event_router to a label's event_router if it has `@label` parameter. # See also 'fluentd/plugin/input.rb' input.router = @event_router input.configure(conf) @inputs << input input end
# File lib/fluent/root_agent.rb, line 66 def configure(conf) error_label_config = nil # initialize <label> elements before configuring all plugins to avoid 'label not found' in input, filter and output. label_configs = {} conf.elements.select { |e| e.name == 'label' }.each { |e| name = e.arg raise ConfigError, "Missing symbol argument on <label> directive" if name.empty? if name == ERROR_LABEL error_label_config = e else add_label(name) label_configs[name] = e end } # Call 'configure' here to avoid 'label not found' label_configs.each { |name, e| @labels[name].configure(e) } setup_error_label(error_label_config) if error_label_config super # initialize <source> elements if @without_source log.info "'--without-source' is applied. Ignore <source> sections" else conf.elements.select { |e| e.name == 'source' }.each { |e| type = e['@type'] || e['type'] raise ConfigError, "Missing 'type' parameter on <source> directive" unless type add_source(type, e) } end end
# File lib/fluent/root_agent.rb, line 174 def emit_error_event(tag, time, record, error) error_info = {error_class: error.class, error: error.to_s, tag: tag, time: time} if @error_collector # A record is not included in the logs because <@ERROR> handles it. This warn is for the notification log.warn "send an error event to @ERROR:", error_info @error_collector.emit(tag, time, record) else error_info[:record] = record log.warn "dump an error event:", error_info end end
# File lib/fluent/root_agent.rb, line 166 def find_label(label_name) if label = @labels[label_name] label else raise ArgumentError, "#{label_name} label not found" end end
# File lib/fluent/root_agent.rb, line 186 def handle_emits_error(tag, es, error) error_info = {error_class: error.class, error: error.to_s, tag: tag} if @error_collector log.warn "send an error event stream to @ERROR:", error_info @error_collector.emit_stream(tag, es) else now = Engine.now if @suppress_emit_error_log_interval.zero? || now > @next_emit_error_log_time log.warn "emit transaction failed:", error_info log.warn_backtrace @next_emit_error_log_time = now + @suppress_emit_error_log_interval end raise error end end
# File lib/fluent/root_agent.rb, line 100 def setup_error_label(e) error_label = add_label(ERROR_LABEL) error_label.configure(e) error_label.root_agent = RootAgentProxyWithoutErrorCollector.new(self) @error_collector = error_label.event_router end
# File lib/fluent/root_agent.rb, line 120 def shutdown # Shutdown Input plugin first to prevent emitting to terminated Output plugin @started_inputs.map { |i| Thread.new do begin log.info "shutting down input", type: Plugin.lookup_name_from_class(i.class), plugin_id: i.plugin_id i.shutdown rescue => e log.warn "unexpected error while shutting down input plugin", plugin: i.class, plugin_id: i.plugin_id, error_class: e.class, error: e log.warn_backtrace end end }.each { |t| t.join } @labels.each { |n, l| l.shutdown } super end
# File lib/fluent/root_agent.rb, line 107 def start super @labels.each { |n, l| l.start } @inputs.each { |i| i.start @started_inputs << i } end
# File lib/fluent/root_agent.rb, line 141 def suppress_interval(interval_time) @suppress_emit_error_log_interval = interval_time @next_emit_error_log_time = Time.now.to_i end