class Fluent::BasicBuffer

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Buffer.new
# File lib/fluent/buffer.rb, line 134
def initialize
  super
  @map = nil # chunks to store data
  @queue = nil # chunks to be flushed
  @parallel_pop = true
end

Public Instance Methods

clear!() click to toggle source
# File lib/fluent/buffer.rb, line 357
def clear!
  @queue.delete_if {|chunk|
    chunk.purge
    true
  }
end
configure(conf) click to toggle source
Calls superclass method Fluent::Buffer#configure
# File lib/fluent/buffer.rb, line 160
def configure(conf)
  super

  if @buffer_queue_full_action == :block
    $log.warn "'block' action stops input process until the buffer full is resolved. Check your pipeline this action is fit or not"
  end
end
emit(key, data, chain) click to toggle source
# File lib/fluent/buffer.rb, line 190
def emit(key, data, chain)
  key = key.to_s

  synchronize do
    begin
      # chunk unique id is generated in #new_chunk
      chunk = (@map[key] ||= new_chunk(key))

      if storable?(chunk, data)
        chain.next
        chunk << data
        return false

      elsif @queue.size >= @buffer_queue_limit
        raise BufferQueueLimitError, "queue size exceeds limit"
      end
    rescue BufferQueueLimitError => e
      case @buffer_queue_full_action
      when :exception
        raise e
      when :block
        # This is rough implementation. New Buffer API should improve this routine by using wait/signal
        $log.debug "buffer queue is full. Wait 1 second to re-emit events"
        sleep 1
        retry
      when :drop_oldest_chunk
        $log.debug "buffer queue is full. Dropping oldest chunk"
        pop(nil)
      end
    end

    if data.bytesize > @buffer_chunk_limit
      $log.warn "Size of the emitted data exceeds buffer_chunk_limit."
      $log.warn "This may occur problems in the output plugins ``at this server.``"
      $log.warn "To avoid problems, set a smaller number to the buffer_chunk_limit"
      $log.warn "in the forward output ``at the log forwarding server.``"
      ### TODO
      # raise BufferChunkLimitError, "received data too large"
    end

    # chunk unique id is generated in #new_chunk
    nc = new_chunk(key)
    ok = false

    begin
      nc << data
      chain.next

      flush_trigger = false
      @queue.synchronize {
        enqueue(chunk) # this is buffer enqueue *hook*
        flush_trigger = @queue.empty?
        @queue << chunk # actual enqueue
        @map[key] = nc
      }

      ok = true
      # false: queue have 1 or more chunks before this emit
      #        so this enqueue is not a trigger to flush
      # true: queue have no chunks before this emit
      #       so this enqueue is a trigger to flush this buffer ASAP
      return flush_trigger
    ensure
      nc.purge unless ok
    end

  end  # synchronize
end
enable_parallel(b=true) click to toggle source
# File lib/fluent/buffer.rb, line 141
def enable_parallel(b=true)
  @parallel_pop = b
end
enqueue(chunk) click to toggle source

enqueueing is done by push this method is actually 'enqueue_hook'

# File lib/fluent/buffer.rb, line 292
def enqueue(chunk)
  raise NotImplementedError, "Implement this method in child class"
end
keys() click to toggle source
# File lib/fluent/buffer.rb, line 259
def keys
  @map.keys
end
new_chunk(key) click to toggle source
# File lib/fluent/buffer.rb, line 282
def new_chunk(key)
  raise NotImplementedError, "Implement this method in child class"
end
pop(out) click to toggle source

shift a chunk from queue, write and purge it returns boolean to indicate whether this buffer have more chunk to be flushed or not

# File lib/fluent/buffer.rb, line 316
def pop(out)
  chunk = nil
  @queue.synchronize do
    if @parallel_pop
      chunk = @queue.find {|c| c.try_mon_enter }
      return false unless chunk
    else
      chunk = @queue.first
      return false unless chunk
      return false unless chunk.try_mon_enter
    end
  end

  begin
    # #push(key) does not push empty chunks into queue.
    # so this check is nonsense...
    if !chunk.empty? && !out.nil?
      write_chunk(chunk, out)
    end

    queue_empty = false
    @queue.synchronize do
      @queue.delete_if {|c|
        c.object_id == chunk.object_id
      }
      queue_empty = @queue.empty?
    end

    chunk.purge

    # return to be flushed once more immediately, or not
    return !queue_empty
  ensure
    chunk.mon_exit
  end
end
push(key) click to toggle source

get the chunk specified by key, and push it into queue

# File lib/fluent/buffer.rb, line 297
def push(key)
  synchronize do
    chunk = @map[key]
    if !chunk || chunk.empty?
      return false
    end

    @queue.synchronize do
      enqueue(chunk)
      @queue << chunk
      @map.delete(key)
    end

    return true
  end  # synchronize
end
queue_size() click to toggle source
# File lib/fluent/buffer.rb, line 263
def queue_size
  @queue.size
end
resume() click to toggle source
# File lib/fluent/buffer.rb, line 286
def resume
  raise NotImplementedError, "Implement this method in child class"
end
shutdown() click to toggle source
# File lib/fluent/buffer.rb, line 173
def shutdown
  synchronize do
    @queue.synchronize do
      until @queue.empty?
        @queue.shift.close
      end
    end
    @map.each_pair {|key,chunk|
      chunk.close
    }
  end
end
start() click to toggle source
# File lib/fluent/buffer.rb, line 168
def start
  @queue, @map = resume
  @queue.extend(MonitorMixin)
end
storable?(chunk, data) click to toggle source
# File lib/fluent/buffer.rb, line 186
def storable?(chunk, data)
  chunk.size + data.bytesize <= @buffer_chunk_limit
end
total_queued_chunk_size() click to toggle source
# File lib/fluent/buffer.rb, line 267
def total_queued_chunk_size
  total = 0
  synchronize {
    @map.each_value {|c|
      total += c.size
    }
    @queue.synchronize {
      @queue.each {|c|
        total += c.size
      }
    }
  }
  total
end
write_chunk(chunk, out) click to toggle source
# File lib/fluent/buffer.rb, line 353
def write_chunk(chunk, out)
  out.write(chunk)
end