class Mongo::CollectionOperationWriter
Public Class Methods
new(collection)
click to toggle source
Calls superclass method
Mongo::CollectionWriter.new
# File lib/mongo/collection_writer.rb, line 199 def initialize(collection) super(collection) end
Public Instance Methods
bulk_execute(ops, options, opts = {})
click to toggle source
# File lib/mongo/collection_writer.rb, line 234 def bulk_execute(ops, options, opts = {}) write_concern = get_write_concern(opts, @collection) errors = [] write_concern_errors = [] exchanges = [] ops.each do |op_type, doc| doc = {:d => @collection.pk_factory.create_pk(doc[:d]), :ord => doc[:ord]} if op_type == :insert doc_opts = doc.merge(opts) d = doc_opts.delete(:d) q = doc_opts.delete(:q) u = doc_opts.delete(:u) begin # use single and NOT batch inserts since there no index for an error response = @collection.operation_writer.send_write_operation(op_type, q, d || u, check_keys = false, doc_opts, write_concern) exchanges << {:op_type => op_type, :batch => [doc], :opts => opts, :response => response} rescue BSON::InvalidDocument, BSON::InvalidKeyName, BSON::InvalidStringEncoding => ex bulk_message = "Bulk write error - #{ex.message} - examine result for complete information" ex = BulkWriteError.new(bulk_message, Mongo::ErrorCode::INVALID_BSON, {:op_type => op_type, :serialize => doc, :ord => doc[:ord], :error => ex}) errors << ex break if options[:ordered] rescue Mongo::WriteConcernError => ex write_concern_errors << ex exchanges << {:op_type => op_type, :batch => [doc], :opts => opts, :response => ex.result} rescue Mongo::OperationFailure => ex errors << ex exchanges << {:op_type => op_type, :batch => [doc], :opts => opts, :response => ex.result} break if options[:ordered] && ex.result["err"] != "norepl" end end [errors, write_concern_errors, exchanges] end
send_write_operation(op_type, selector, doc_or_docs, check_keys, opts, write_concern, collection_name=@name)
click to toggle source
# File lib/mongo/collection_writer.rb, line 203 def send_write_operation(op_type, selector, doc_or_docs, check_keys, opts, write_concern, collection_name=@name) message = BSON::ByteBuffer.new("", @connection.max_message_size) message.put_int((op_type == :insert && !!opts[:continue_on_error]) ? 1 : 0) BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{collection_name}") if op_type == :update update_options = 0 update_options += 1 if opts[:upsert] update_options += 2 if opts[:multi] message.put_int(update_options) elsif op_type == :delete delete_options = 0 delete_options += 1 if opts[:limit] && opts[:limit] != 0 message.put_int(delete_options) end message.put_binary(BSON::BSON_CODER.serialize(selector, false, true, @connection.max_bson_size).to_s) if selector [doc_or_docs].flatten(1).compact.each do |document| message.put_binary(BSON::BSON_CODER.serialize(document, check_keys, true, @connection.max_bson_size).to_s) if message.size > @connection.max_message_size raise BSON::InvalidDocument, "Message is too large. This message is limited to #{@connection.max_message_size} bytes." end end instrument(op_type, :database => @db.name, :collection => collection_name, :selector => selector, :documents => doc_or_docs) do op_code = OPCODE[op_type] if Mongo::WriteConcern.gle?(write_concern) @connection.send_message_with_gle(op_code, message, @db.name, nil, write_concern) else @connection.send_message(op_code, message) end end end
Private Instance Methods
batch_message_append(message, serialized_doc, write_concern)
click to toggle source
# File lib/mongo/collection_writer.rb, line 274 def batch_message_append(message, serialized_doc, write_concern) message.put_binary(serialized_doc.to_s) end
batch_message_initialize(message, op_type, continue_on_error, write_concern)
click to toggle source
# File lib/mongo/collection_writer.rb, line 268 def batch_message_initialize(message, op_type, continue_on_error, write_concern) message.clear!.clear message.put_int(continue_on_error ? 1 : 0) BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{@name}") end
batch_message_send(message, op_type, batch_docs, write_concern, continue_on_error)
click to toggle source
# File lib/mongo/collection_writer.rb, line 278 def batch_message_send(message, op_type, batch_docs, write_concern, continue_on_error) instrument(:insert, :database => @db.name, :collection => @name, :documents => batch_docs) do if Mongo::WriteConcern.gle?(write_concern) @connection.send_message_with_gle(Mongo::Constants::OP_INSERT, message, @db.name, nil, write_concern) else @connection.send_message(Mongo::Constants::OP_INSERT, message) end end end
batch_write_max_sizes(write_concern)
click to toggle source
# File lib/mongo/collection_writer.rb, line 288 def batch_write_max_sizes(write_concern) [@connection.max_message_size, @connection.max_message_size, @connection.max_bson_size] end