You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Enso-Bot/venv/Lib/site-packages/mysqlx/protocol.py

878 lines
33 KiB
Python

# Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0, as
# published by the Free Software Foundation.
#
# This program is also distributed with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms,
# as designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an
# additional permission to link the program and your derivative works
# with the separately licensed software that they have included with
# MySQL.
#
# Without limiting anything contained in the foregoing, this file,
# which is part of MySQL Connector/Python, is also subject to the
# Universal FOSS Exception, version 1.0, a copy of which can be found at
# http://oss.oracle.com/licenses/universal-foss-exception.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""Implementation of the X protocol for MySQL servers."""
import struct
from .compat import STRING_TYPES, INT_TYPES
from .errors import (InterfaceError, NotSupportedError, OperationalError,
ProgrammingError)
from .expr import (ExprParser, build_expr, build_scalar, build_bool_scalar,
build_int_scalar, build_unsigned_int_scalar)
from .helpers import encode_to_bytes, get_item_or_attr
from .result import Column
from .protobuf import (CRUD_PREPARE_MAPPING, SERVER_MESSAGES,
PROTOBUF_REPEATED_TYPES, Message, mysqlxpb_enum)
class MessageReaderWriter(object):
"""Implements a Message Reader/Writer.
Args:
socket_stream (mysqlx.connection.SocketStream): `SocketStream` object.
"""
def __init__(self, socket_stream):
self._stream = socket_stream
self._msg = None
def _read_message(self):
"""Read message.
Raises:
:class:`mysqlx.ProgrammingError`: If e connected server does not
have the MySQL X protocol plugin
enabled.
Returns:
mysqlx.protobuf.Message: MySQL X Protobuf Message.
"""
hdr = self._stream.read(5)
msg_len, msg_type = struct.unpack("<LB", hdr)
if msg_type == 10:
raise ProgrammingError("The connected server does not have the "
"MySQL X protocol plugin enabled or protocol"
"mismatch")
payload = self._stream.read(msg_len - 1)
msg_type_name = SERVER_MESSAGES.get(msg_type)
if not msg_type_name:
raise ValueError("Unknown msg_type: {0}".format(msg_type))
# Do not parse empty notices, Message requires a type in payload.
if msg_type == 11 and payload == b"":
return self._read_message()
else:
try:
msg = Message.from_server_message(msg_type, payload)
except RuntimeError:
return self._read_message()
return msg
def read_message(self):
"""Read message.
Returns:
mysqlx.protobuf.Message: MySQL X Protobuf Message.
"""
if self._msg is not None:
msg = self._msg
self._msg = None
return msg
return self._read_message()
def push_message(self, msg):
"""Push message.
Args:
msg (mysqlx.protobuf.Message): MySQL X Protobuf Message.
Raises:
:class:`mysqlx.OperationalError`: If message push slot is full.
"""
if self._msg is not None:
raise OperationalError("Message push slot is full")
self._msg = msg
def write_message(self, msg_id, msg):
"""Write message.
Args:
msg_id (int): The message ID.
msg (mysqlx.protobuf.Message): MySQL X Protobuf Message.
"""
msg_str = encode_to_bytes(msg.serialize_to_string())
header = struct.pack("<LB", len(msg_str) + 1, msg_id)
self._stream.sendall(b"".join([header, msg_str]))
class Protocol(object):
"""Implements the MySQL X Protocol.
Args:
read_writer (mysqlx.protocol.MessageReaderWriter): A Message \
Reader/Writer object.
"""
def __init__(self, reader_writer):
self._reader = reader_writer
self._writer = reader_writer
self._message = None
def _apply_filter(self, msg, stmt):
"""Apply filter.
Args:
msg (mysqlx.protobuf.Message): The MySQL X Protobuf Message.
stmt (Statement): A `Statement` based type object.
"""
if stmt.has_where:
msg["criteria"] = stmt.get_where_expr()
if stmt.has_sort:
msg["order"].extend(stmt.get_sort_expr())
if stmt.has_group_by:
msg["grouping"].extend(stmt.get_grouping())
if stmt.has_having:
msg["grouping_criteria"] = stmt.get_having()
def _create_any(self, arg):
"""Create any.
Args:
arg (object): Arbitrary object.
Returns:
mysqlx.protobuf.Message: MySQL X Protobuf Message.
"""
if isinstance(arg, STRING_TYPES):
value = Message("Mysqlx.Datatypes.Scalar.String", value=arg)
scalar = Message("Mysqlx.Datatypes.Scalar", type=8, v_string=value)
return Message("Mysqlx.Datatypes.Any", type=1, scalar=scalar)
elif isinstance(arg, bool):
return Message("Mysqlx.Datatypes.Any", type=1,
scalar=build_bool_scalar(arg))
elif isinstance(arg, INT_TYPES):
if arg < 0:
return Message("Mysqlx.Datatypes.Any", type=1,
scalar=build_int_scalar(arg))
return Message("Mysqlx.Datatypes.Any", type=1,
scalar=build_unsigned_int_scalar(arg))
elif isinstance(arg, tuple) and len(arg) == 2:
arg_key, arg_value = arg
obj_fld = Message("Mysqlx.Datatypes.Object.ObjectField",
key=arg_key, value=self._create_any(arg_value))
obj = Message("Mysqlx.Datatypes.Object",
fld=[obj_fld.get_message()])
return Message("Mysqlx.Datatypes.Any", type=2, obj=obj)
elif isinstance(arg, dict) or (isinstance(arg, (list, tuple)) and
isinstance(arg[0], dict)):
array_values = []
for items in arg:
obj_flds = []
for key, value in items.items():
# Array can only handle Any types, Mysqlx.Datatypes.Any.obj
obj_fld = Message("Mysqlx.Datatypes.Object.ObjectField",
key=key, value=self._create_any(value))
obj_flds.append(obj_fld.get_message())
msg_obj = Message("Mysqlx.Datatypes.Object", fld=obj_flds)
msg_any = Message("Mysqlx.Datatypes.Any", type=2, obj=msg_obj)
array_values.append(msg_any.get_message())
msg = Message("Mysqlx.Datatypes.Array")
msg["value"] = array_values
return Message("Mysqlx.Datatypes.Any", type=3, array=msg)
return None
def _get_binding_args(self, stmt, is_scalar=True):
"""Returns the binding any/scalar.
Args:
stmt (Statement): A `Statement` based type object.
is_scalar (bool): `True` to return scalar values.
Raises:
:class:`mysqlx.ProgrammingError`: If unable to find placeholder for
parameter.
Returns:
list: A list of ``Any`` or ``Scalar`` objects.
"""
build_value = lambda value: build_scalar(value).get_message() \
if is_scalar else self._create_any(value).get_message()
bindings = stmt.get_bindings()
binding_map = stmt.get_binding_map()
# If binding_map is None it's a SqlStatement object
if binding_map is None:
return [build_value(value) for value in bindings]
count = len(binding_map)
args = count * [None]
if count != len(bindings):
raise ProgrammingError("The number of bind parameters and "
"placeholders do not match")
for name, value in bindings.items():
if name not in binding_map:
raise ProgrammingError("Unable to find placeholder for "
"parameter: {0}".format(name))
pos = binding_map[name]
args[pos] = build_value(value)
return args
def _process_frame(self, msg, result):
"""Process frame.
Args:
msg (mysqlx.protobuf.Message): A MySQL X Protobuf Message.
result (Result): A `Result` based type object.
"""
if msg["type"] == 1:
warn_msg = Message.from_message("Mysqlx.Notice.Warning",
msg["payload"])
result.append_warning(warn_msg.level, warn_msg.code, warn_msg.msg)
elif msg["type"] == 2:
Message.from_message("Mysqlx.Notice.SessionVariableChanged",
msg["payload"])
elif msg["type"] == 3:
sess_state_msg = Message.from_message(
"Mysqlx.Notice.SessionStateChanged", msg["payload"])
if sess_state_msg["param"] == mysqlxpb_enum(
"Mysqlx.Notice.SessionStateChanged.Parameter."
"GENERATED_DOCUMENT_IDS"):
result.set_generated_ids(
[get_item_or_attr(
get_item_or_attr(value, 'v_octets'), 'value').decode()
for value in sess_state_msg["value"]])
else: # Following results are unitary and not a list
sess_state_value = sess_state_msg["value"][0] \
if isinstance(sess_state_msg["value"],
tuple(PROTOBUF_REPEATED_TYPES)) \
else sess_state_msg["value"]
if sess_state_msg["param"] == mysqlxpb_enum(
"Mysqlx.Notice.SessionStateChanged.Parameter."
"ROWS_AFFECTED"):
result.set_rows_affected(
get_item_or_attr(sess_state_value, "v_unsigned_int"))
elif sess_state_msg["param"] == mysqlxpb_enum(
"Mysqlx.Notice.SessionStateChanged.Parameter."
"GENERATED_INSERT_ID"):
result.set_generated_insert_id(get_item_or_attr(
sess_state_value, "v_unsigned_int"))
def _read_message(self, result):
"""Read message.
Args:
result (Result): A `Result` based type object.
"""
while True:
msg = self._reader.read_message()
if msg.type == "Mysqlx.Error":
raise OperationalError(msg["msg"], msg["code"])
elif msg.type == "Mysqlx.Notice.Frame":
try:
self._process_frame(msg, result)
except:
continue
elif msg.type == "Mysqlx.Sql.StmtExecuteOk":
return None
elif msg.type == "Mysqlx.Resultset.FetchDone":
result.set_closed(True)
elif msg.type == "Mysqlx.Resultset.FetchDoneMoreResultsets":
result.set_has_more_results(True)
elif msg.type == "Mysqlx.Resultset.Row":
result.set_has_data(True)
break
else:
break
return msg
def get_capabilites(self):
"""Get capabilities.
Returns:
mysqlx.protobuf.Message: MySQL X Protobuf Message.
"""
msg = Message("Mysqlx.Connection.CapabilitiesGet")
self._writer.write_message(
mysqlxpb_enum("Mysqlx.ClientMessages.Type.CON_CAPABILITIES_GET"),
msg)
msg = self._reader.read_message()
while msg.type == "Mysqlx.Notice.Frame":
msg = self._reader.read_message()
return msg
def set_capabilities(self, **kwargs):
"""Set capabilities.
Args:
**kwargs: Arbitrary keyword arguments.
Returns:
mysqlx.protobuf.Message: MySQL X Protobuf Message.
"""
capabilities = Message("Mysqlx.Connection.Capabilities")
for key, value in kwargs.items():
capability = Message("Mysqlx.Connection.Capability")
capability["name"] = key
if isinstance(value, dict):
items = value
obj_flds = []
for item in items:
obj_fld = Message("Mysqlx.Datatypes.Object.ObjectField",
key=item,
value=self._create_any(items[item]))
obj_flds.append(obj_fld.get_message())
msg_obj = Message("Mysqlx.Datatypes.Object", fld=obj_flds)
msg_any = Message("Mysqlx.Datatypes.Any", type=2, obj=msg_obj)
capability["value"] = msg_any.get_message()
else:
capability["value"] = self._create_any(value)
capabilities["capabilities"].extend([capability.get_message()])
msg = Message("Mysqlx.Connection.CapabilitiesSet")
msg["capabilities"] = capabilities
self._writer.write_message(
mysqlxpb_enum("Mysqlx.ClientMessages.Type.CON_CAPABILITIES_SET"),
msg)
try:
return self.read_ok()
except InterfaceError as err:
# Skip capability "session_connect_attrs" error since
# is only available on version >= 8.0.16
if err.errno != 5002:
raise
return None
def set_session_capabilities(self, **kwargs):
"""Set capabilities.
Args:
**kwargs: Arbitrary keyword arguments.
Returns:
mysqlx.protobuf.Message: MySQL X Protobuf Message.
"""
capabilities = Message("Mysqlx.Connection.Capabilities")
for key, value in kwargs.items():
capability = Message("Mysqlx.Connection.Capability")
capability["name"] = key
capability["value"] = self._create_any(value)
capabilities["capabilities"].extend([capability.get_message()])
msg = Message("Mysqlx.Connection.CapabilitiesSet")
msg["session_connect_attrs"] = capabilities
self._writer.write_message(
mysqlxpb_enum("Mysqlx.ClientMessages.Type.CON_CAPABILITIES_SET"),
msg)
return self.read_ok()
def send_auth_start(self, method, auth_data=None, initial_response=None):
"""Send authenticate start.
Args:
method (str): Message method.
auth_data (Optional[str]): Authentication data.
initial_response (Optional[str]): Initial response.
"""
msg = Message("Mysqlx.Session.AuthenticateStart")
msg["mech_name"] = method
if auth_data is not None:
msg["auth_data"] = auth_data
if initial_response is not None:
msg["initial_response"] = initial_response
self._writer.write_message(mysqlxpb_enum(
"Mysqlx.ClientMessages.Type.SESS_AUTHENTICATE_START"), msg)
def read_auth_continue(self):
"""Read authenticate continue.
Raises:
:class:`InterfaceError`: If the message type is not
`Mysqlx.Session.AuthenticateContinue`
Returns:
str: The authentication data.
"""
msg = self._reader.read_message()
while msg.type == "Mysqlx.Notice.Frame":
msg = self._reader.read_message()
if msg.type != "Mysqlx.Session.AuthenticateContinue":
raise InterfaceError("Unexpected message encountered during "
"authentication handshake")
return msg["auth_data"]
def send_auth_continue(self, auth_data):
"""Send authenticate continue.
Args:
auth_data (str): Authentication data.
"""
msg = Message("Mysqlx.Session.AuthenticateContinue",
auth_data=auth_data)
self._writer.write_message(mysqlxpb_enum(
"Mysqlx.ClientMessages.Type.SESS_AUTHENTICATE_CONTINUE"), msg)
def read_auth_ok(self):
"""Read authenticate OK.
Raises:
:class:`mysqlx.InterfaceError`: If message type is `Mysqlx.Error`.
"""
while True:
msg = self._reader.read_message()
if msg.type == "Mysqlx.Session.AuthenticateOk":
break
if msg.type == "Mysqlx.Error":
raise InterfaceError(msg.msg)
def send_prepare_prepare(self, msg_type, msg, stmt):
"""
Send prepare statement.
Args:
msg_type (str): Message ID string.
msg (mysqlx.protobuf.Message): MySQL X Protobuf Message.
stmt (Statement): A `Statement` based type object.
Raises:
:class:`mysqlx.NotSupportedError`: If prepared statements are not
supported.
.. versionadded:: 8.0.16
"""
if stmt.has_limit and msg.type != "Mysqlx.Crud.Insert":
# Remove 'limit' from message by building a new one
if msg.type == "Mysqlx.Crud.Find":
_, msg = self.build_find(stmt)
elif msg.type == "Mysqlx.Crud.Update":
_, msg = self.build_update(stmt)
elif msg.type == "Mysqlx.Crud.Delete":
_, msg = self.build_delete(stmt)
else:
raise ValueError("Invalid message type: {}".format(msg_type))
# Build 'limit_expr' message
position = len(stmt.get_bindings())
placeholder = mysqlxpb_enum("Mysqlx.Expr.Expr.Type.PLACEHOLDER")
msg_limit_expr = Message("Mysqlx.Crud.LimitExpr")
msg_limit_expr["row_count"] = Message("Mysqlx.Expr.Expr",
type=placeholder,
position=position)
if msg.type == "Mysqlx.Crud.Find":
msg_limit_expr["offset"] = Message("Mysqlx.Expr.Expr",
type=placeholder,
position=position + 1)
msg["limit_expr"] = msg_limit_expr
oneof_type, oneof_op = CRUD_PREPARE_MAPPING[msg_type]
msg_oneof = Message("Mysqlx.Prepare.Prepare.OneOfMessage")
msg_oneof["type"] = mysqlxpb_enum(oneof_type)
msg_oneof[oneof_op] = msg
msg_prepare = Message("Mysqlx.Prepare.Prepare")
msg_prepare["stmt_id"] = stmt.stmt_id
msg_prepare["stmt"] = msg_oneof
self._writer.write_message(
mysqlxpb_enum("Mysqlx.ClientMessages.Type.PREPARE_PREPARE"),
msg_prepare)
try:
self.read_ok()
except InterfaceError:
raise NotSupportedError
def send_prepare_execute(self, msg_type, msg, stmt):
"""
Send execute statement.
Args:
msg_type (str): Message ID string.
msg (mysqlx.protobuf.Message): MySQL X Protobuf Message.
stmt (Statement): A `Statement` based type object.
.. versionadded:: 8.0.16
"""
oneof_type, oneof_op = CRUD_PREPARE_MAPPING[msg_type]
msg_oneof = Message("Mysqlx.Prepare.Prepare.OneOfMessage")
msg_oneof["type"] = mysqlxpb_enum(oneof_type)
msg_oneof[oneof_op] = msg
msg_execute = Message("Mysqlx.Prepare.Execute")
msg_execute["stmt_id"] = stmt.stmt_id
args = self._get_binding_args(stmt, is_scalar=False)
if args:
msg_execute["args"].extend(args)
if stmt.has_limit:
msg_execute["args"].extend([
self._create_any(stmt.get_limit_row_count()).get_message(),
self._create_any(stmt.get_limit_offset()).get_message()
])
self._writer.write_message(
mysqlxpb_enum("Mysqlx.ClientMessages.Type.PREPARE_EXECUTE"),
msg_execute)
def send_prepare_deallocate(self, stmt_id):
"""
Send prepare deallocate statement.
Args:
stmt_id (int): Statement ID.
.. versionadded:: 8.0.16
"""
msg_dealloc = Message("Mysqlx.Prepare.Deallocate")
msg_dealloc["stmt_id"] = stmt_id
self._writer.write_message(
mysqlxpb_enum("Mysqlx.ClientMessages.Type.PREPARE_DEALLOCATE"),
msg_dealloc)
self.read_ok()
def send_msg_without_ps(self, msg_type, msg, stmt):
"""
Send a message without prepared statements support.
Args:
msg_type (str): Message ID string.
msg (mysqlx.protobuf.Message): MySQL X Protobuf Message.
stmt (Statement): A `Statement` based type object.
.. versionadded:: 8.0.16
"""
if stmt.has_limit:
msg_limit = Message("Mysqlx.Crud.Limit")
msg_limit["row_count"] = stmt.get_limit_row_count()
if msg.type == "Mysqlx.Crud.Find":
msg_limit["offset"] = stmt.get_limit_offset()
msg["limit"] = msg_limit
is_scalar = False \
if msg_type == "Mysqlx.ClientMessages.Type.SQL_STMT_EXECUTE" \
else True
args = self._get_binding_args(stmt, is_scalar=is_scalar)
if args:
msg["args"].extend(args)
self.send_msg(msg_type, msg)
def send_msg(self, msg_type, msg):
"""
Send a message.
Args:
msg_type (str): Message ID string.
msg (mysqlx.protobuf.Message): MySQL X Protobuf Message.
.. versionadded:: 8.0.16
"""
self._writer.write_message(mysqlxpb_enum(msg_type), msg)
def build_find(self, stmt):
"""Build find/read message.
Args:
stmt (Statement): A :class:`mysqlx.ReadStatement` or
:class:`mysqlx.FindStatement` object.
Returns:
(tuple): Tuple containing:
* `str`: Message ID string.
* :class:`mysqlx.protobuf.Message`: MySQL X Protobuf Message.
.. versionadded:: 8.0.16
"""
data_model = mysqlxpb_enum("Mysqlx.Crud.DataModel.DOCUMENT"
if stmt.is_doc_based() else
"Mysqlx.Crud.DataModel.TABLE")
collection = Message("Mysqlx.Crud.Collection",
name=stmt.target.name,
schema=stmt.schema.name)
msg = Message("Mysqlx.Crud.Find", data_model=data_model,
collection=collection)
if stmt.has_projection:
msg["projection"] = stmt.get_projection_expr()
self._apply_filter(msg, stmt)
if stmt.is_lock_exclusive():
msg["locking"] = \
mysqlxpb_enum("Mysqlx.Crud.Find.RowLock.EXCLUSIVE_LOCK")
elif stmt.is_lock_shared():
msg["locking"] = \
mysqlxpb_enum("Mysqlx.Crud.Find.RowLock.SHARED_LOCK")
if stmt.lock_contention > 0:
msg["locking_options"] = stmt.lock_contention
return "Mysqlx.ClientMessages.Type.CRUD_FIND", msg
def build_update(self, stmt):
"""Build update message.
Args:
stmt (Statement): A :class:`mysqlx.ModifyStatement` or
:class:`mysqlx.UpdateStatement` object.
Returns:
(tuple): Tuple containing:
* `str`: Message ID string.
* :class:`mysqlx.protobuf.Message`: MySQL X Protobuf Message.
.. versionadded:: 8.0.16
"""
data_model = mysqlxpb_enum("Mysqlx.Crud.DataModel.DOCUMENT"
if stmt.is_doc_based() else
"Mysqlx.Crud.DataModel.TABLE")
collection = Message("Mysqlx.Crud.Collection",
name=stmt.target.name,
schema=stmt.schema.name)
msg = Message("Mysqlx.Crud.Update", data_model=data_model,
collection=collection)
self._apply_filter(msg, stmt)
for _, update_op in stmt.get_update_ops().items():
operation = Message("Mysqlx.Crud.UpdateOperation")
operation["operation"] = update_op.update_type
operation["source"] = update_op.source
if update_op.value is not None:
operation["value"] = build_expr(update_op.value)
msg["operation"].extend([operation.get_message()])
return "Mysqlx.ClientMessages.Type.CRUD_UPDATE", msg
def build_delete(self, stmt):
"""Build delete message.
Args:
stmt (Statement): A :class:`mysqlx.DeleteStatement` or
:class:`mysqlx.RemoveStatement` object.
Returns:
(tuple): Tuple containing:
* `str`: Message ID string.
* :class:`mysqlx.protobuf.Message`: MySQL X Protobuf Message.
.. versionadded:: 8.0.16
"""
data_model = mysqlxpb_enum("Mysqlx.Crud.DataModel.DOCUMENT"
if stmt.is_doc_based() else
"Mysqlx.Crud.DataModel.TABLE")
collection = Message("Mysqlx.Crud.Collection", name=stmt.target.name,
schema=stmt.schema.name)
msg = Message("Mysqlx.Crud.Delete", data_model=data_model,
collection=collection)
self._apply_filter(msg, stmt)
return "Mysqlx.ClientMessages.Type.CRUD_DELETE", msg
def build_execute_statement(self, namespace, stmt, fields=None):
"""Build execute statement.
Args:
namespace (str): The namespace.
stmt (Statement): A `Statement` based type object.
fields (Optional[dict]): The message fields.
Returns:
(tuple): Tuple containing:
* `str`: Message ID string.
* :class:`mysqlx.protobuf.Message`: MySQL X Protobuf Message.
.. versionadded:: 8.0.16
"""
msg = Message("Mysqlx.Sql.StmtExecute", namespace=namespace, stmt=stmt,
compact_metadata=False)
if fields:
obj_flds = []
for key, value in fields.items():
obj_fld = Message("Mysqlx.Datatypes.Object.ObjectField",
key=key, value=self._create_any(value))
obj_flds.append(obj_fld.get_message())
msg_obj = Message("Mysqlx.Datatypes.Object", fld=obj_flds)
msg_any = Message("Mysqlx.Datatypes.Any", type=2, obj=msg_obj)
msg["args"] = [msg_any.get_message()]
return "Mysqlx.ClientMessages.Type.SQL_STMT_EXECUTE", msg
def build_insert(self, stmt):
"""Build insert statement.
Args:
stmt (Statement): A :class:`mysqlx.AddStatement` or
:class:`mysqlx.InsertStatement` object.
Returns:
(tuple): Tuple containing:
* `str`: Message ID string.
* :class:`mysqlx.protobuf.Message`: MySQL X Protobuf Message.
.. versionadded:: 8.0.16
"""
data_model = mysqlxpb_enum("Mysqlx.Crud.DataModel.DOCUMENT"
if stmt.is_doc_based() else
"Mysqlx.Crud.DataModel.TABLE")
collection = Message("Mysqlx.Crud.Collection",
name=stmt.target.name,
schema=stmt.schema.name)
msg = Message("Mysqlx.Crud.Insert", data_model=data_model,
collection=collection)
if hasattr(stmt, "_fields"):
for field in stmt._fields:
expr = ExprParser(field, not stmt.is_doc_based()) \
.parse_table_insert_field()
msg["projection"].extend([expr.get_message()])
for value in stmt.get_values():
row = Message("Mysqlx.Crud.Insert.TypedRow")
if isinstance(value, list):
for val in value:
row["field"].extend([build_expr(val).get_message()])
else:
row["field"].extend([build_expr(value).get_message()])
msg["row"].extend([row.get_message()])
if hasattr(stmt, "is_upsert"):
msg["upsert"] = stmt.is_upsert()
return "Mysqlx.ClientMessages.Type.CRUD_INSERT", msg
def close_result(self, result):
"""Close the result.
Args:
result (Result): A `Result` based type object.
Raises:
:class:`mysqlx.OperationalError`: If message read is None.
"""
msg = self._read_message(result)
if msg is not None:
raise OperationalError("Expected to close the result")
def read_row(self, result):
"""Read row.
Args:
result (Result): A `Result` based type object.
"""
msg = self._read_message(result)
if msg is None:
return None
if msg.type == "Mysqlx.Resultset.Row":
return msg
self._reader.push_message(msg)
return None
def get_column_metadata(self, result):
"""Returns column metadata.
Args:
result (Result): A `Result` based type object.
Raises:
:class:`mysqlx.InterfaceError`: If unexpected message.
"""
columns = []
while True:
msg = self._read_message(result)
if msg is None:
break
if msg.type == "Mysqlx.Resultset.Row":
self._reader.push_message(msg)
break
if msg.type != "Mysqlx.Resultset.ColumnMetaData":
raise InterfaceError("Unexpected msg type")
col = Column(msg["type"], msg["catalog"], msg["schema"],
msg["table"], msg["original_table"],
msg["name"], msg["original_name"],
msg.get("length", 21),
msg.get("collation", 0),
msg.get("fractional_digits", 0),
msg.get("flags", 16),
msg.get("content_type"))
columns.append(col)
return columns
def read_ok(self):
"""Read OK.
Raises:
:class:`mysqlx.InterfaceError`: If unexpected message.
"""
msg = self._reader.read_message()
if msg.type == "Mysqlx.Error":
raise InterfaceError("Mysqlx.Error: {}".format(msg["msg"]),
errno=msg["code"])
if msg.type != "Mysqlx.Ok":
raise InterfaceError("Unexpected message encountered")
def send_connection_close(self):
"""Send connection close."""
msg = Message("Mysqlx.Connection.Close")
self._writer.write_message(mysqlxpb_enum(
"Mysqlx.ClientMessages.Type.CON_CLOSE"), msg)
def send_close(self):
"""Send close."""
msg = Message("Mysqlx.Session.Close")
self._writer.write_message(mysqlxpb_enum(
"Mysqlx.ClientMessages.Type.SESS_CLOSE"), msg)
def send_expect_open(self):
"""Send expectation."""
cond_key = mysqlxpb_enum(
"Mysqlx.Expect.Open.Condition.Key.EXPECT_FIELD_EXIST")
msg_oc = Message("Mysqlx.Expect.Open.Condition")
msg_oc["condition_key"] = cond_key
msg_oc["condition_value"] = "6.1"
msg_eo = Message("Mysqlx.Expect.Open")
msg_eo['cond'] = [msg_oc.get_message()]
self._writer.write_message(mysqlxpb_enum(
"Mysqlx.ClientMessages.Type.EXPECT_OPEN"), msg_eo)
def send_reset(self, keep_open=None):
"""Send reset session message.
Returns:
boolean: ``True`` if the server will keep the session open,
otherwise ``False``.
"""
msg = Message("Mysqlx.Session.Reset")
if keep_open is None:
try:
# Send expectation: keep connection open
self.send_expect_open()
self.read_ok()
keep_open = True
except InterfaceError:
# Expectation is unkown by this version of the server
keep_open = False
if keep_open:
msg["keep_open"] = True
self._writer.write_message(mysqlxpb_enum(
"Mysqlx.ClientMessages.Type.SESS_RESET"), msg)
self.read_ok()
if keep_open:
return True
return False