{"id":8158,"date":"2018-12-21T14:36:56","date_gmt":"2018-12-21T16:36:56","guid":{"rendered":"http:\/\/blog.plataformatec.com.br\/?p=8158"},"modified":"2019-01-04T18:08:06","modified_gmt":"2019-01-04T20:08:06","slug":"building-a-new-mysql-adapter-for-ecto-part-iii-dbconnection-integration","status":"publish","type":"post","link":"http:\/\/blog.plataformatec.com.br\/2018\/12\/building-a-new-mysql-adapter-for-ecto-part-iii-dbconnection-integration\/","title":{"rendered":"Building a new MySQL adapter for Ecto, Part III: DBConnection Integration"},"content":{"rendered":"
Welcome to the “Building a new MySQL adapter for Ecto” series:<\/p>\n
In the first two articles of the series we have learned the basic building blocks for interacting with a MySQL server using its binary protocol over TCP.<\/p>\n
To have a production-quality driver, however, there’s more work to do. Namely, we need to think about:<\/p>\n
In short, we need: reliability, performance, and first-class support for common DB features. This is where DBConnection<\/a> comes in.<\/p>\n DBConnection is a behaviour module for implementing efficient database connection client processes, pools and transactions. It has been created by Elixir and Ecto Core Team member James Fish and has been introduced in Ecto v2.0.<\/p>\n Per DBConnection documentation<\/a> we can see how it addresses concerns mentioned above:<\/p>\n \n DBConnection handles callbacks differently to most behaviours. Some callbacks will be called in the calling process, with the state copied to and from the calling process. This is useful when the data for a request is large and means that a calling process can interact with a socket directly.<\/p>\n A side effect of this is that query handling can be written in a simple blocking fashion, while the connection process itself will remain responsive to OTP messages and can enqueue and cancel queued requests.<\/p>\n If a request or series of requests takes too long to handle in the client process a timeout will trigger and the socket can be cleanly disconnected by the connection process.<\/p>\n If a calling process waits too long to start its request it will timeout and its request will be cancelled. This prevents requests building up when the database cannot keep up.<\/p>\n If no requests are received for a period of time the connection will trigger an idle timeout and the database can be pinged to keep the connection alive.<\/p>\n Should the connection be lost, attempts will be made to reconnect with (configurable) exponential random backoff to reconnect. All state is lost when a connection disconnects but the process is reused.<\/p>\n The Let’s see how we can use it!<\/p>\n We will first create a module responsible for implementing DBConnection callbacks:<\/p>\n When we compile it, we’ll get a bunch of warnings about callbacks that we haven’t implemented yet.<\/p>\n Let’s start with the That’s a lot to unpack so let’s break this down:<\/p>\n It’s worth taking a step back at looking at our overall design:<\/p>\n Let’s take a look at Notice the callbacks reference types like: As far as DBConnection is concerned, Let’s define the first callback, The callback receives If we receive an The only places that we could get Note, we set Let’s now define the Let’s break this down.<\/p>\n Similarly to Same as last time, if we get an We’ve mentioned that the Finally, let’s add a public function that users of the driver will use:<\/p>\n and see it all working.<\/p>\n Arguments to In this article, we’ve seen a sneak peek of integration with the DBConnection library. It gave us With this, we’re almost done with our adapter series. In the final article we’ll use our driver as an Ecto adapter. Stay tuned!<\/p>\n <\/a><\/p>\n","protected":false},"excerpt":{"rendered":" Welcome to the “Building a new MySQL adapter for Ecto” series: Part I: Hello World Part II: Encoding\/Decoding Part III: DBConnection Integration (you’re here!) Part IV: Ecto Integration In the first two articles of the series we have learned the basic building blocks for interacting with a MySQL server using its binary protocol over TCP. … \u00bb<\/a><\/p>\n","protected":false},"author":70,"featured_media":0,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"ngg_post_thumbnail":0,"footnotes":""},"categories":[1],"tags":[238,143],"aioseo_notices":[],"jetpack_sharing_enabled":true,"jetpack_featured_media_url":"","_links":{"self":[{"href":"http:\/\/blog.plataformatec.com.br\/wp-json\/wp\/v2\/posts\/8158"}],"collection":[{"href":"http:\/\/blog.plataformatec.com.br\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/blog.plataformatec.com.br\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/blog.plataformatec.com.br\/wp-json\/wp\/v2\/users\/70"}],"replies":[{"embeddable":true,"href":"http:\/\/blog.plataformatec.com.br\/wp-json\/wp\/v2\/comments?post=8158"}],"version-history":[{"count":31,"href":"http:\/\/blog.plataformatec.com.br\/wp-json\/wp\/v2\/posts\/8158\/revisions"}],"predecessor-version":[{"id":8336,"href":"http:\/\/blog.plataformatec.com.br\/wp-json\/wp\/v2\/posts\/8158\/revisions\/8336"}],"wp:attachment":[{"href":"http:\/\/blog.plataformatec.com.br\/wp-json\/wp\/v2\/media?parent=8158"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/blog.plataformatec.com.br\/wp-json\/wp\/v2\/categories?post=8158"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/blog.plataformatec.com.br\/wp-json\/wp\/v2\/tags?post=8158"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}DBConnection<\/h2>\n
DBConnection.Query<\/code> protocol provide utility functions so that queries can be prepared or encoded and results decoding without blocking the connection or pool.<\/p><\/blockquote>\n
DBConnection Integration<\/h2>\n
defmodule MyXQL.Protocol do\n use DBConnection\nend\n<\/code><\/pre>\n
connect\/1<\/code><\/a> callback and while at it, add some supporting code:<\/p>\n
defmodule MyXQL.Error do\n defexception [:message]\nend\n\ndefmodule MyXQL.Protocol do\n @moduledoc false\n use DBConnection\n import MyXQL.Messages\n defstruct [:sock]\n\n @impl true\n def connect(opts) do\n hostname = Keyword.get(opts, :hostname, \"localhost\")\n port = Keyword.get(opts, :port, 3306)\n timeout = Keyword.get(opts, :timeout, 5000)\n username = Keyword.get(opts, :username, System.get_env(\"USER\")) || raise \"username is missing\"\n sock_opts = [:binary, active: false]\n\n case :gen_tcp.connect(String.to_charlist(hostname), port, sock_opts) do\n {:ok, sock} ->\n handshake(username, timeout, %__MODULE__{sock: sock})\n\n {:error, reason} ->\n {:error, %MyXQL.Error{message: \"error when connecting: #{inspect(reason)}\"}}\n\n err_packet(message: message) ->\n {:error, %MyXQL.Error{message: \"error when performing handshake: #{message}\"}}\n end\n end\n\n @impl true\n def checkin(state) do\n {:ok, state}\n end\n\n @impl true\n def checkout(state) do\n {:ok, state}\n end\n\n @impl true\n def ping(state) do\n {:ok, state}\n end\n\n defp handshake(username, timeout, state) do\n with {:ok, data} <- :gen_tcp.recv(state.sock, 0, timeout),\n initial_handshake_packet() = decode_initial_handshake_packet(data),\n data = encode_handshake_response_packet(username),\n :ok <- :gen_tcp.send(state.sock, data),\n {:ok, data} <- :gen_tcp.recv(state.sock, 0, timeout),\n ok_packet() <- decode_handshake_response_packet(data) do \n {:ok, sock}\n end\n end\nend\n\ndefmodule MyXQL do\n @moduledoc \"...\"\n\n @doc \"...\"\n def start_link(opts) do \n DBConnection.start_link(MyXQL.Protocol, opts)\n end\nend\n<\/code><\/pre>\n
\n
connect\/1<\/code> must return
{:ok, state}<\/code> on success and
{:error, exception}<\/code> on failure. Our connection state for now will be just the socket. (In a complete driver we’d use the state to manage prepared transaction references, status of transaction etc.) On error, we return an exception.<\/li>\n
opts<\/code> and provide sane defaults * we try to connect to the TCP server and if successful, perform the handshake.<\/li>\n
MyXQL.start_link\/1<\/code> that is an entry point to the driver<\/li>\n
checkin\/1<\/code>,
checkout\/1<\/code> and
ping\/1<\/code> callbacks<\/li>\n<\/ul>\n
\n
MyXQL<\/code> module exposes a small public API and calls into an internal module<\/li>\n
MyXQL.Protocol<\/code> implements
DBConnection<\/code> behaviour and is the place where all side-effects are being handled<\/li>\n
MyXQL.Messages<\/code> implements pure functions for encoding and decoding packets This separation is really important. By keeping protocol data<\/em> separate from protocol interactions<\/em> code we have a codebase that’s much easier to understand and maintain.<\/li>\n<\/ul>\n
Prepared Statements<\/h2>\n
handle_prepare\/3<\/code> and
handle_execute\/4<\/code> callbacks that are used to
\nhandle prepared statements:<\/p>\niex> b DBConnection.handle_prepare\n@callback handle_prepare(query(), opts :: Keyword.t(), state :: any()) ::\n {:ok, query(), new_state :: any()}\n | {:error | :disconnect, Exception.t(), new_state :: any()}\n\nPrepare a query with the database. Return {:ok, query, state} where query is a\nquery to pass to execute\/4 or close\/3, {:error, exception, state} to return an\nerror and continue or {:disconnect, exception, state} to return an error and\ndisconnect.\n\nThis callback is intended for cases where the state of a connection is needed\nto prepare a query and\/or the query can be saved in the database to call later.\n\nThis callback is called in the client process.\n<\/code><\/pre>\n
iex> b DBConnection.handle_execute\n@callback handle_execute(query(), params(), opts :: Keyword.t(), state :: any()) ::\n {:ok, query(), result(), new_state :: any()}\n | {:error | :disconnect, Exception.t(), new_state :: any()}\n\nExecute a query prepared by c:handle_prepare\/3. Return {:ok, query, result,\nstate} to return altered query query and result result and continue, {:error,\nexception, state} to return an error and continue or {:disconnect, exception,\nstate} to return an error and disconnect.\n\nThis callback is called in the client process.\n<\/code><\/pre>\n
query()<\/code>,
result()<\/code> and
params()<\/code>.
\nLet’s take a look at them too:<\/p>\niex> t DBConnection.result\n@type result() :: any()\n\niex> t DBConnection.params\n@type params() :: any()\n\niex> t DBConnection.query\n@type query() :: DBConnection.Query.t()\n<\/code><\/pre>\n
result()<\/code> and
params()<\/code> can be any term (it’s up to us to define these) and the
query()<\/code> must implement the
DBConnection.Query<\/code><\/a> protocol.<\/p>\n
DBConnection.Query<\/code> is used for preparing queries, encoding their params, and decoding their
\nresults. Let’s define query and result structs as well as minimal protocol implementation.<\/p>\ndefmodule MyXQL.Result do\n defstruct [:columns, :rows]\nend\n\ndefmodule MyXQL.Query do\n defstruct [:statement, :statement_id]\n\n defimpl DBConnection.Query do\n def parse(query, _opts), do: query\n\n def describe(query, _opts), do: query\n\n def encode(_query, params, _opts), do: params\n\n def decode(_query, result, _opts), do: result\n end\nend\n<\/code><\/pre>\n
handle_prepare\/3<\/code>:<\/p>\n
defmodule MyXQL.Protocol do\n # ...\n\n @impl true\n def handle_prepare(%MyXQL.Query{statement: statement}, _opts, state) do\n data = encode_com_stmt_prepare(query.statement)\n\n with :ok <- sock_send(data, state),\n {:ok, data} <- sock_recv(state),\n com_stmt_prepare_ok(statement_id: statement_id) <- decode_com_stmt_prepare_response(data) do\n query = %{query | statement_id: statement_id}\n {:ok, query, state}\n else\n err_packet(message: message) ->\n {:error, %MyXQL.Error{message: \"error when preparing query: #{message}\"}, state}\n\n {:error, reason} ->\n {:disconnect, %MyXQL.Error{message: \"error when preparing query: #{inspect(reason)}\"}, state}\n end\n end\n\n defp sock_send(data, state), do: :gen_tcp.recv(state.sock, data, :infinity)\n\n defp sock_recv(state), do: :gen_tcp.recv(state.sock, :infinity)\nend\n<\/code><\/pre>\n
query<\/code>,
opts<\/code> (which we ignore), and
state<\/code>. We encode the query statement into
COM_STMT_PREPARE<\/code><\/a> packet, send it, receive response, decode the
COM_STMT_PREPARE Response<\/code><\/a>, and put the retrieved
statement_id<\/code> into our query struct.<\/p>\n
ERR Packet<\/code><\/a>, we put the error message into our
MyXQL.Error<\/code> exception and return that.<\/p>\n
{:error, reason}<\/code> tuple is we could get it from are the
gen_tcp.send,recv<\/code> calls – if we get an error there it means there might be something wrong with the socket. By returning
{:disconnect, _, _}<\/code>, DBConnection will take care of closing the socket and will attempt to re-connect with a new one.<\/p>\n
timeout<\/code> to
:infinity<\/code> on our send\/recv calls. That’s because DBConnection is managing the process these calls will be executed in and it maintains it’s own timeouts. (And if we hit these timeouts, it cleans up the socket automatically.)<\/p>\n
handle_execute\/4<\/code> callback:<\/p>\n
defmodule MyXQL.Protocol do\n # ...\n\n @impl true\n def handle_execute(%{statement_id: statement_id} = query, params, _opts, state)\n when is_integer(statement_id) do\n data = encode_com_stmt_execute(statement_id, params)\n\n with :ok <- sock_send(state, data),\n {:ok, data} <- sock_recv(state),\n resultset(columns: columns, rows: rows) = decode_com_stmt_execute_response() do\n columns = Enum.map(columns, &column_definition(&1, :name))\n result = %MyXQL.Result{columns: columns, rows: rows}\n {:ok, query, result, state}\n else\n err_packet(message: message) ->\n {:error, %MyXQL.Error{message: \"error when preparing query: #{message}\"}, state}\n\n {:error, reason} ->\n {:disconnect, %MyXQL.Error{message: \"error when preparing query: #{inspect(reason)}\"}, state}\n end\n end\nend\n\ndefmodule MyXQL.Messages do\n # ...\n\n # https:\/\/dev.mysql.com\/doc\/internals\/en\/com-query-response.html#packet-ProtocolText::Resultset\n defrecord :resultset, [:column_count, :columns, :row_count, :rows, :warning_count, :status_flags]\n\n def decode_com_stmt_prepare_response(data) do\n # ...\n resultset(...)\n end\n\n # https:\/\/dev.mysql.com\/doc\/internals\/en\/com-query-response.html#packet-Protocol::ColumnDefinition41\n defrecord :column_definition, [:name, :type]\nend\n<\/code><\/pre>\n
handle_execute\/4<\/code> receives an already prepared query,
params<\/code> to encode, opts, and the state.<\/p>\n
handle_prepare\/3<\/code>, we encode the
COM_STMT_EXECUTE<\/code><\/a> packet, send it and receive a response, decode
COM_STMT_EXECUTE Response<\/code><\/a>, into a
resultset<\/code> record, and finally build the result struct.<\/p>\n
ERR Packet<\/code> we return an
{:error, _, _}<\/code> response; on socket problems, we simply disconnect and let DBConnection handle re-connecting at later time.<\/p>\n
DBConnection.Query<\/code> protocol is used to prepare queries, and in fact we could perform encoding of params and decoding the result in implementation functions. We’ve left that part out for brevity.<\/p>\n
defmodule MyXQL do\n # ...\n\n def prepare_execute(conn, statement, params, opts) do\n query = %MyXQL.Query{statement: statement}\n DBConnection.prepare_execute(conn, query, params, opts)\n end\nend\n<\/code><\/pre>\n
iex> {:ok, pid} = MyXQL.start_link([])\niex> MyXQL.prepare_execute(pid, \"SELECT ?\", [42], [])\n{:ok, %MyXQL.Query{statement: \"SELECT ? + ?\", statement_id: 1},\n%MyXQL.Result{columns: [\"? + ?\"], rows: [[5]]}}\n<\/code><\/pre>\n
MyXQL.start_link<\/code> are passed down to
\nDBConnection.start_link\/2<\/code><\/a>,
\nso starting a pool of 2 connections is as simple as:<\/p>\niex> {:ok, pid} = MyXQL.start_link(pool_size: 2)\n<\/code><\/pre>\n
Conclusion<\/h2>\n
\nmany benefits:<\/p>\n\n
:gen_tcp<\/code> functions without worrying about OTP messages and timeouts;
\nDBConnection will handle these<\/li>\n