class Mongo::CollectionCommandWriter
Public Class Methods
new(collection)
click to toggle source
Calls superclass method
Mongo::CollectionWriter.new
# File lib/mongo/collection_writer.rb, line 295 def initialize(collection) super(collection) end
Public Instance Methods
bulk_execute(ops, options, opts = {})
click to toggle source
# File lib/mongo/collection_writer.rb, line 318 def bulk_execute(ops, options, opts = {}) errors = [] write_concern_errors = [] exchanges = [] ops = (options[:ordered] == false) ? sort_by_first_sym(ops) : ops # sort by write-type ordered_group_by_first(ops).each do |op_type, documents| documents.collect! {|doc| {:d => @collection.pk_factory.create_pk(doc[:d]), :ord => doc[:ord]} } if op_type == :insert error_docs, batch_errors, batch_write_concern_errors, batch_exchanges = batch_write(op_type, documents, check_keys = false, opts.merge(:ordered => options[:ordered])) errors += batch_errors write_concern_errors += batch_write_concern_errors exchanges += batch_exchanges break if options[:ordered] && !batch_errors.empty? end [errors, write_concern_errors, exchanges] end
send_write_command(op_type, selector, doc_or_docs, check_keys, opts, write_concern, collection_name=@name)
click to toggle source
# File lib/mongo/collection_writer.rb, line 299 def send_write_command(op_type, selector, doc_or_docs, check_keys, opts, write_concern, collection_name=@name) if op_type == :insert argument = [doc_or_docs].flatten(1).compact elsif op_type == :update argument = [{:q => selector, :u => doc_or_docs, :multi => !!opts[:multi]}] argument.first.merge!(:upsert => opts[:upsert]) if opts[:upsert] elsif op_type == :delete argument = [{:q => selector, :limit => (opts[:limit] || 0)}] else raise ArgumentError, "Write operation type must be :insert, :update or :delete" end request = BSON::OrderedHash[op_type, collection_name, WRITE_COMMAND_ARG_KEY[op_type], argument] request.merge!(:writeConcern => write_concern, :ordered => !opts[:continue_on_error]) request.merge!(opts) instrument(op_type, :database => @db.name, :collection => collection_name, :selector => selector, :documents => doc_or_docs) do @db.command(request) end end
Private Instance Methods
batch_message_append(message, serialized_doc, write_concern)
click to toggle source
# File lib/mongo/collection_writer.rb, line 344 def batch_message_append(message, serialized_doc, write_concern) message.push_doc!(serialized_doc) end
batch_message_initialize(message, op_type, continue_on_error, write_concern)
click to toggle source
# File lib/mongo/collection_writer.rb, line 337 def batch_message_initialize(message, op_type, continue_on_error, write_concern) message.clear!.clear @bson_empty ||= BSON::BSON_CODER.serialize({}) message.put_binary(@bson_empty.to_s) message.unfinish!.array!(WRITE_COMMAND_ARG_KEY[op_type]) end
batch_message_send(message, op_type, batch_docs, write_concern, continue_on_error)
click to toggle source
# File lib/mongo/collection_writer.rb, line 348 def batch_message_send(message, op_type, batch_docs, write_concern, continue_on_error) message.finish! request = BSON::OrderedHash[op_type, @name, :bson, message] request.merge!(:writeConcern => write_concern, :ordered => !continue_on_error) instrument(:insert, :database => @db.name, :collection => @name, :documents => batch_docs) do @db.command(request) end end
batch_write_max_sizes(write_concern)
click to toggle source
# File lib/mongo/collection_writer.rb, line 357 def batch_write_max_sizes(write_concern) [MongoClient::COMMAND_HEADROOM, MongoClient::APPEND_HEADROOM, MongoClient::SERIALIZE_HEADROOM].collect{|h| @connection.max_bson_size + h} end