class Fluent::NewTailInput

Attributes

paths[R]

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Input.new
# File lib/fluent/plugin/in_tail.rb, line 27
def initialize
  super
  @paths = []
  @tails = {}
end

Public Instance Methods

close_watcher(tw, close_io = true) click to toggle source

Fluent::NewTailInput::TailWatcher#close is called by another thread at shutdown phase. It causes 'can't modify string; temporarily locked' error in IOHandler so adding close_io argument to avoid this problem. At shutdown, IOHandler's io will be released automatically after detached the event loop

# File lib/fluent/plugin/in_tail.rb, line 219
def close_watcher(tw, close_io = true)
  tw.close(close_io)
  flush_buffer(tw)
  if tw.unwatched && @pf
    @pf[tw.path].update_pos(PositionFile::UNWATCHED_POSITION)
  end
end
close_watcher_after_rotate_wait(tw) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 227
def close_watcher_after_rotate_wait(tw)
  closer = TailWatcher::Closer.new(@rotate_wait, tw, log, &method(:close_watcher))
  closer.attach(@loop)
end
configure(conf) click to toggle source
Calls superclass method Fluent::Input#configure
# File lib/fluent/plugin/in_tail.rb, line 66
def configure(conf)
  super

  @paths = @path.split(',').map {|path| path.strip }
  if @paths.empty?
    raise ConfigError, "tail: 'path' parameter is required on tail input"
  end

  unless @pos_file
    $log.warn "'pos_file PATH' parameter is not set to a 'tail' source."
    $log.warn "this parameter is highly recommended to save the position to resume tailing."
  end

  configure_parser(conf)
  configure_tag

  @multiline_mode = conf['format'] =~ /multiline/
  @receive_handler = if @multiline_mode
                       method(:parse_multilines)
                     else
                       method(:parse_singleline)
                     end
end
configure_parser(conf) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 90
def configure_parser(conf)
  @parser = Plugin.new_parser(conf['format'])
  @parser.configure(conf)
end
configure_tag() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 95
def configure_tag
  if @tag.index('*')
    @tag_prefix, @tag_suffix = @tag.split('*')
    @tag_suffix ||= ''
  else
    @tag_prefix = nil
    @tag_suffix = nil
  end
end
convert_line_to_event(line, es, tail_watcher) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 281
def convert_line_to_event(line, es, tail_watcher)
  begin
    line.chomp!  # remove \n
    line.force_encoding(@encoding) if @encoding
    @parser.parse(line) { |time, record|
      if time && record
        record[@path_key] ||= tail_watcher.path unless @path_key.nil?
        es.add(time, record)
      else
        log.warn "pattern not match: #{line.inspect}"
      end
    }
  rescue => e
    log.warn line.dump, error: e.to_s
    log.debug_backtrace(e.backtrace)
  end
end
expand_paths() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 129
def expand_paths
  date = Time.now
  paths = []

  excluded = @exclude_path.map { |path| path = date.strftime(path); path.include?('*') ? Dir.glob(path) : path }.flatten.uniq
  @paths.each { |path|
    path = date.strftime(path)
    if path.include?('*')
      paths += Dir.glob(path).select { |p|
        if File.readable?(p)
          true
        else
          log.warn "#{p} unreadable. It is excluded and would be examined next time."
          false
        end
      }
    else
      # When file is not created yet, Dir.glob returns an empty array. So just add when path is static.
      paths << path
    end
  }
  paths - excluded
end
flush_buffer(tw) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 232
def flush_buffer(tw)
  if lb = tw.line_buffer
    lb.chomp!
    lb.force_encoding(@encoding) if @encoding
    @parser.parse(lb) { |time, record|
      if time && record
        tag = if @tag_prefix || @tag_suffix
                @tag_prefix + tw.tag + @tag_suffix
              else
                @tag
              end
        record[@path_key] ||= tw.path unless @path_key.nil?
        router.emit(tag, time, record)
      else
        log.warn "got incomplete line at shutdown from #{tw.path}: #{lb.inspect}"
      end
    }
  end
end
parse_multilines(lines, tail_watcher) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 307
def parse_multilines(lines, tail_watcher)
  lb = tail_watcher.line_buffer
  es = MultiEventStream.new
  if @parser.has_firstline?
    tail_watcher.line_buffer_timer_flusher.reset_timer if tail_watcher.line_buffer_timer_flusher
    lines.each { |line|
      if @parser.firstline?(line)
        if lb
          convert_line_to_event(lb, es, tail_watcher)
        end
        lb = line
      else
        if lb.nil?
          log.warn "got incomplete line before first line from #{tail_watcher.path}: #{line.inspect}"
        else
          lb << line
        end
      end
    }
  else
    lb ||= ''
    lines.each do |line|
      lb << line
      @parser.parse(lb) { |time, record|
        if time && record
          convert_line_to_event(lb, es, tail_watcher)
          lb = ''
        end
      }
    end
  end
  tail_watcher.line_buffer = lb
  es
end
parse_singleline(lines, tail_watcher) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 299
def parse_singleline(lines, tail_watcher)
  es = MultiEventStream.new
  lines.each { |line|
    convert_line_to_event(line, es, tail_watcher)
  }
  es
end
receive_lines(lines, tail_watcher) click to toggle source

@return true if no error or unrecoverable error happens in emit action. false if got BufferQueueLimitError

# File lib/fluent/plugin/in_tail.rb, line 260
def receive_lines(lines, tail_watcher)
  es = @receive_handler.call(lines, tail_watcher)
  unless es.empty?
    tag = if @tag_prefix || @tag_suffix
            @tag_prefix + tail_watcher.tag + @tag_suffix
          else
            @tag
          end
    begin
      router.emit_stream(tag, es)
    rescue BufferQueueLimitError
      return false
    rescue
      # ignore non BufferQueueLimitError errors because in_tail can't recover. Engine shows logs and backtraces.
      return true
    end
  end

  return true
end
refresh_watchers() click to toggle source

in_tail with '*' path doesn't check rotation file equality at refresh phase. So you should not use '*' path when your logs will be rotated by another tool. It will cause log duplication after updated watch files. In such case, you should separate log directory and specify two paths in path parameter. e.g. path /path/to/dir/*,/path/to/rotated_logs/target_file

# File lib/fluent/plugin/in_tail.rb, line 158
def refresh_watchers
  target_paths = expand_paths
  existence_paths = @tails.keys

  unwatched = existence_paths - target_paths
  added = target_paths - existence_paths

  stop_watchers(unwatched, false, true) unless unwatched.empty?
  start_watchers(added) unless added.empty?
end
run() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 252
def run
  @loop.run
rescue
  log.error "unexpected error", error: $!.to_s
  log.error_backtrace
end
setup_watcher(path, pe) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 169
def setup_watcher(path, pe)
  line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
  tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher,  &method(:receive_lines))
  tw.attach(@loop)
  tw
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 120
def shutdown
  @refresh_trigger.detach if @refresh_trigger && @refresh_trigger.attached?

  stop_watchers(@tails.keys, true)
  @loop.stop rescue nil # when all watchers are detached, `stop` raises RuntimeError. We can ignore this exception.
  @thread.join
  @pf_file.close if @pf_file
end
start() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 105
def start
  if @pos_file
    @pf_file = File.open(@pos_file, File::RDWR|File::CREAT, DEFAULT_FILE_PERMISSION)
    @pf_file.sync = true
    @pf = PositionFile.parse(@pf_file)
  end

  @loop = Coolio::Loop.new
  refresh_watchers

  @refresh_trigger = TailWatcher::TimerWatcher.new(@refresh_interval, true, log, &method(:refresh_watchers))
  @refresh_trigger.attach(@loop)
  @thread = Thread.new(&method(:run))
end
start_watchers(paths) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 176
def start_watchers(paths)
  paths.each { |path|
    pe = nil
    if @pf
      pe = @pf[path]
      if @read_from_head && pe.read_inode.zero?
        begin
          pe.update(File::Stat.new(path).ino, 0)
        rescue Errno::ENOENT
          $log.warn "#{path} not found. Continuing without tailing it."
        end
      end
    end

    @tails[path] = setup_watcher(path, pe)
  }
end
stop_watchers(paths, immediate = false, unwatched = false) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 194
def stop_watchers(paths, immediate = false, unwatched = false)
  paths.each { |path|
    tw = @tails.delete(path)
    if tw
      tw.unwatched = unwatched
      if immediate
        close_watcher(tw, false)
      else
        close_watcher_after_rotate_wait(tw)
      end
    end
  }
end
update_watcher(path, pe) click to toggle source

#refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.

# File lib/fluent/plugin/in_tail.rb, line 209
def update_watcher(path, pe)
  rotated_tw = @tails[path]
  @tails[path] = setup_watcher(path, pe)
  close_watcher_after_rotate_wait(rotated_tw) if rotated_tw
end