class Fluent::RootAgent

Fluentd forms a tree structure to manage plugins:

                 RootAgent
                     |
        +------------+-------------+-------------+
        |            |             |             |
     <label>      <source>      <filter>      <match>
        |
   +----+----+
   |         |
<filter>   <match>

Relation:

Next step: `fluentd/agent.rb` Next step: 'fluentd/label.rb'

Constants

ERROR_LABEL

Attributes

inputs[R]
labels[R]

Public Class Methods

new(system_config = SystemConfig.new) click to toggle source
Calls superclass method Fluent::Agent.new
# File lib/fluent/root_agent.rb, line 50
def initialize(system_config = SystemConfig.new)
  super

  @labels = {}
  @inputs = []
  @started_inputs = []
  @suppress_emit_error_log_interval = 0
  @next_emit_error_log_time = nil

  suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil?
  @without_source = system_config.without_source unless system_config.without_source.nil?
end

Public Instance Methods

add_label(name) click to toggle source
# File lib/fluent/root_agent.rb, line 160
def add_label(name)
  label = Label.new(name)
  label.root_agent = self
  @labels[name] = label
end
add_source(type, conf) click to toggle source
# File lib/fluent/root_agent.rb, line 146
def add_source(type, conf)
  log.info "adding source", type: type

  input = Plugin.new_input(type)
  # <source> emits events to the top-level event router (RootAgent#event_router).
  # Input#configure overwrites event_router to a label's event_router if it has `@label` parameter.
  # See also 'fluentd/plugin/input.rb'
  input.router = @event_router
  input.configure(conf)
  @inputs << input

  input
end
configure(conf) click to toggle source
Calls superclass method Fluent::Agent#configure
# File lib/fluent/root_agent.rb, line 66
def configure(conf)
  error_label_config = nil

  # initialize <label> elements before configuring all plugins to avoid 'label not found' in input, filter and output.
  label_configs = {}
  conf.elements.select { |e| e.name == 'label' }.each { |e|
    name = e.arg
    raise ConfigError, "Missing symbol argument on <label> directive" if name.empty?

    if name == ERROR_LABEL
      error_label_config = e
    else
      add_label(name)
      label_configs[name] = e
    end
  }
  # Call 'configure' here to avoid 'label not found'
  label_configs.each { |name, e| @labels[name].configure(e) }
  setup_error_label(error_label_config) if error_label_config

  super

  # initialize <source> elements
  if @without_source
    log.info "'--without-source' is applied. Ignore <source> sections"
  else
    conf.elements.select { |e| e.name == 'source' }.each { |e|
      type = e['@type'] || e['type']
      raise ConfigError, "Missing 'type' parameter on <source> directive" unless type
      add_source(type, e)
    }
  end
end
emit_error_event(tag, time, record, error) click to toggle source
# File lib/fluent/root_agent.rb, line 174
def emit_error_event(tag, time, record, error)
  error_info = {error_class: error.class, error: error.to_s, tag: tag, time: time}
  if @error_collector
    # A record is not included in the logs because <@ERROR> handles it. This warn is for the notification
    log.warn "send an error event to @ERROR:", error_info
    @error_collector.emit(tag, time, record)
  else
    error_info[:record] = record
    log.warn "dump an error event:", error_info
  end
end
find_label(label_name) click to toggle source
# File lib/fluent/root_agent.rb, line 166
def find_label(label_name)
  if label = @labels[label_name]
    label
  else
    raise ArgumentError, "#{label_name} label not found"
  end
end
handle_emits_error(tag, es, error) click to toggle source
# File lib/fluent/root_agent.rb, line 186
def handle_emits_error(tag, es, error)
  error_info = {error_class: error.class, error: error.to_s, tag: tag}
  if @error_collector
    log.warn "send an error event stream to @ERROR:", error_info
    @error_collector.emit_stream(tag, es)
  else
    now = Engine.now
    if @suppress_emit_error_log_interval.zero? || now > @next_emit_error_log_time
      log.warn "emit transaction failed:", error_info
      log.warn_backtrace
      @next_emit_error_log_time = now + @suppress_emit_error_log_interval
    end
    raise error
  end
end
setup_error_label(e) click to toggle source
# File lib/fluent/root_agent.rb, line 100
def setup_error_label(e)
  error_label = add_label(ERROR_LABEL)
  error_label.configure(e)
  error_label.root_agent = RootAgentProxyWithoutErrorCollector.new(self)
  @error_collector = error_label.event_router
end
shutdown() click to toggle source
Calls superclass method Fluent::Agent#shutdown
# File lib/fluent/root_agent.rb, line 120
def shutdown
  # Shutdown Input plugin first to prevent emitting to terminated Output plugin
  @started_inputs.map { |i|
    Thread.new do
      begin
        log.info "shutting down input", type: Plugin.lookup_name_from_class(i.class), plugin_id: i.plugin_id
        i.shutdown
      rescue => e
        log.warn "unexpected error while shutting down input plugin", plugin: i.class, plugin_id: i.plugin_id, error_class: e.class, error: e
        log.warn_backtrace
      end
    end
  }.each { |t| t.join }

  @labels.each { |n, l|
    l.shutdown
  }

  super
end
start() click to toggle source
Calls superclass method Fluent::Agent#start
# File lib/fluent/root_agent.rb, line 107
def start
  super

  @labels.each { |n, l|
    l.start
  }

  @inputs.each { |i|
    i.start
    @started_inputs << i
  }
end
suppress_interval(interval_time) click to toggle source
# File lib/fluent/root_agent.rb, line 141
def suppress_interval(interval_time)
  @suppress_emit_error_log_interval = interval_time
  @next_emit_error_log_time = Time.now.to_i
end