From 072918f86bda7e45c72b0ce8bdf3a803e2978082 Mon Sep 17 00:00:00 2001 From: benonymus Date: Thu, 14 Dec 2023 08:47:05 +0700 Subject: [PATCH] switch the http library to httpoison --- lib/nsq/lookupd.ex | 28 ++++++------ mix.exs | 2 +- mix.lock | 9 +++- test/consumer_test.exs | 97 +++++++++++++++++++++--------------------- test/producer_test.exs | 8 ++-- 5 files changed, 77 insertions(+), 67 deletions(-) diff --git a/lib/nsq/lookupd.ex b/lib/nsq/lookupd.ex index c238cd7..331503b 100644 --- a/lib/nsq/lookupd.ex +++ b/lib/nsq/lookupd.ex @@ -38,20 +38,20 @@ defmodule NSQ.Lookupd do lookupd_url = "http://#{host}:#{port}/lookup?topic=#{topic}" headers = [{"Accept", "application/vnd.nsq; version=1.0"}] - case HTTPotion.get(lookupd_url, headers: headers) do - %HTTPotion.Response{status_code: 200, body: body, headers: headers} -> + case HTTPoison.get(lookupd_url, headers) do + {:ok, %HTTPoison.Response{status_code: 200, body: body, headers: headers}} -> normalize_200_response(headers, body) - %HTTPotion.Response{status_code: 404} -> + {:ok, %HTTPoison.Response{status_code: 404}} -> %{} |> normalize_response - %HTTPotion.Response{status_code: status, body: body} -> + {:ok, %HTTPoison.Response{status_code: status, body: body}} -> NSQ.Logger.error("Unexpected status code from #{lookupd_url}: #{status}") %{status_code: status, data: body} |> normalize_response - %HTTPotion.ErrorResponse{} = error -> + {:error, %HTTPoison.Error{} = error} -> NSQ.Logger.error("Error connecting to #{lookupd_url}: #{inspect(error)}") normalize_response(%{}) end @@ -61,13 +61,17 @@ defmodule NSQ.Lookupd do defp normalize_200_response(headers, body) do body = if body == nil || body == "", do: "{}", else: body - if headers[:"X-Nsq-Content-Type"] == "nsq; version=1.0" do - body - |> Jason.decode!() - |> normalize_response - else - %{status_code: 200, status_txt: "OK", data: body} - |> normalize_response + header = "X-Nsq-Content-Type" + + case List.keyfind(headers, header, 0) do + {^header, "nsq; version=1.0"} -> + body + |> Jason.decode!() + |> normalize_response + + _ -> + %{status_code: 200, status_txt: "OK", data: body} + |> normalize_response end end diff --git a/mix.exs b/mix.exs index 69eb8fe..4de7626 100644 --- a/mix.exs +++ b/mix.exs @@ -34,10 +34,10 @@ defmodule ElixirNsq.Mixfile do # Type "mix help deps" for more examples and options defp deps do [ - {:httpotion, "~> 3.2"}, {:elixir_uuid, "~> 1.2"}, {:socket2, "~> 2.1"}, {:jason, "~> 1.4"}, + {:httpoison, "~> 2.0"}, # testing {:secure_random, "~> 0.5", only: :test}, diff --git a/mix.lock b/mix.lock index 3087b61..b06eb92 100644 --- a/mix.lock +++ b/mix.lock @@ -6,14 +6,18 @@ "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, "ex_doc": {:hex, :ex_doc, "0.30.9", "d691453495c47434c0f2052b08dd91cc32bc4e1a218f86884563448ee2502dd2", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d7aaaf21e95dc5cddabf89063327e96867d00013963eadf2c6ad135506a8bc10"}, - "httpotion": {:hex, :httpotion, "3.2.0", "007c81c3a15b4860c893dea858eab2ce859a260b47071e85dcf9611a4226324e", [:mix], [{:ibrowse, "4.4.0", [hex: :ibrowse, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "726b3fdfc47d7f15302dac5f6a4152a5002fe8230dee8bdd65b8d154b573580b"}, - "ibrowse": {:hex, :ibrowse, "4.4.0", "2d923325efe0d2cb09b9c6a047b2835a5eda69d8a47ed6ff8bc03628b764e991", [:rebar3], [], "hexpm", "6a8e5988872086f0506bef68311493551ac5beae7c06ba2a00d5e9f97a60f1c2"}, + "hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"}, + "httpoison": {:hex, :httpoison, "2.2.1", "87b7ed6d95db0389f7df02779644171d7319d319178f6680438167d7b69b1f3d", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "51364e6d2f429d80e14fe4b5f8e39719cacd03eb3f9a9286e61e216feac2d2df"}, + "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.3", "d684f4bac8690e70b06eb52dad65d26de2eefa44cd19d64a8095e1417df7c8fd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "b78dc853d2e670ff6390b605d807263bf606da3c82be37f9d7f68635bd886fc9"}, + "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, + "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, + "parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"}, "plug": {:hex, :plug, "1.15.2", "94cf1fa375526f30ff8770837cb804798e0045fd97185f0bb9e5fcd858c792a3", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "02731fa0c2dcb03d8d21a1d941bdbbe99c2946c0db098eee31008e04c6283615"}, "plug_cowboy": {:hex, :plug_cowboy, "2.6.1", "9a3bbfceeb65eff5f39dab529e5cd79137ac36e913c02067dba3963a26efe9b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "de36e1a21f451a18b790f37765db198075c25875c64834bcc82d90b309eb6613"}, "plug_crypto": {:hex, :plug_crypto, "2.0.0", "77515cc10af06645abbfb5e6ad7a3e9714f805ae118fa1a70205f80d2d70fe73", [:mix], [], "hexpm", "53695bae57cc4e54566d993eb01074e4d894b65a3766f1c43e2c61a1b0f45ea9"}, @@ -22,4 +26,5 @@ "socket2": {:hex, :socket2, "2.1.1", "850a8e90963358e1e9ba53efe4e91272f2b39489e3f2a82d291010b20539e8c3", [:mix], [{:certifi, "~> 2.12", [hex: :certifi, repo: "hexpm", optional: false]}], "hexpm", "37face03ee9c98e100084fa202a5291c2200a9b4e661dad7b163c9efdb0a2865"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, } diff --git a/test/consumer_test.exs b/test/consumer_test.exs index d3757df..4e2794c 100644 --- a/test/consumer_test.exs +++ b/test/consumer_test.exs @@ -20,7 +20,7 @@ defmodule NSQ.ConsumerTest do doctest NSQ.Consumer alias NSQ.Consumer, as: Cons alias NSQ.Consumer.Helpers, as: H - alias HTTPotion, as: HTTP + alias HTTPoison, as: HTTP alias NSQ.Consumer.Connections alias NSQ.Connection, as: Conn alias NSQ.ConnInfo @@ -30,10 +30,10 @@ defmodule NSQ.ConsumerTest do setup do NSQ.Logger.configure(level: :warn) - HTTP.post("http://127.0.0.1:6751/topic/delete?topic=#{@test_topic}") - HTTP.post("http://127.0.0.1:6761/topic/delete?topic=#{@test_topic}") - HTTP.post("http://127.0.0.1:6771/topic/delete?topic=#{@test_topic}") - HTTP.post("http://127.0.0.1:6781/topic/delete?topic=#{@test_topic}") + HTTP.post("http://127.0.0.1:6751/topic/delete?topic=#{@test_topic}", "") + HTTP.post("http://127.0.0.1:6761/topic/delete?topic=#{@test_topic}", "") + HTTP.post("http://127.0.0.1:6771/topic/delete?topic=#{@test_topic}", "") + HTTP.post("http://127.0.0.1:6781/topic/delete?topic=#{@test_topic}", "") :ok end @@ -59,10 +59,10 @@ defmodule NSQ.ConsumerTest do NSQ.Consumer.event_manager(consumer) |> :gen_event.add_handler(NSQ.ConsumerTest.EventForwarder, self()) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "hello") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "hello") assert_receive {:message_finished, _}, 2000 - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "too_slow") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "too_slow") assert_receive {:message_requeued, _}, 2000 end @@ -88,7 +88,7 @@ defmodule NSQ.ConsumerTest do NSQ.Consumer.event_manager(consumer) |> :gen_event.add_handler(NSQ.ConsumerTest.EventForwarder, self()) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "hello") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "hello") # Without touch, this message would fail after 1 second. So we test that # it takes longer than 1 second but succeeds. @@ -111,12 +111,12 @@ defmodule NSQ.ConsumerTest do end }) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "hello") - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "hello") - HTTP.post("http://127.0.0.1:6761/pub?topic=#{@test_topic}", body: "hello") - HTTP.post("http://127.0.0.1:6761/pub?topic=#{@test_topic}", body: "hello") - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "hello") - HTTP.post("http://127.0.0.1:6761/pub?topic=#{@test_topic}", body: "hello") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "hello") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "hello") + HTTP.post("http://127.0.0.1:6761/pub?topic=#{@test_topic}", "hello") + HTTP.post("http://127.0.0.1:6761/pub?topic=#{@test_topic}", "hello") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "hello") + HTTP.post("http://127.0.0.1:6761/pub?topic=#{@test_topic}", "hello") :timer.sleep(100) [info1, info2] = NSQ.Consumer.conn_info(consumer) |> Map.values() @@ -155,14 +155,14 @@ defmodule NSQ.ConsumerTest do end }) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "fast") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "fast") assert_receive(:handled, 2000) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "slow") - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "medium") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "slow") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "medium") NSQ.Consumer.close(consumer) :timer.sleep(50) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "fast") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "fast") refute_receive(:handled, 2000) end @@ -181,7 +181,7 @@ defmodule NSQ.ConsumerTest do NSQ.Consumer.event_manager(consumer) |> :gen_event.add_handler(NSQ.ConsumerTest.EventForwarder, self()) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "HTTP message") assert_receive(:handled, 2000) assert_receive({:message, %NSQ.Message{}}, 2000) @@ -213,11 +213,11 @@ defmodule NSQ.ConsumerTest do previous_timestamp = info.last_msg_timestamp :timer.sleep(1000) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "ok") - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "req") - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "req2000") - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "fail") - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "backoff") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "ok") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "req") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "req2000") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "fail") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "backoff") assert_receive({:message, _}, 2000) assert_receive({:message, _}, 2000) @@ -256,7 +256,7 @@ defmodule NSQ.ConsumerTest do }) # Send a message so we can be sure the connection is up and working first. - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "HTTP message") assert_receive(:handled, 2000) # Abruptly close the connection @@ -281,7 +281,7 @@ defmodule NSQ.ConsumerTest do assert conn1 != conn2 # Send another message so we can verify the new connection is working. - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "HTTP message") assert_receive(:handled, 2000) end @@ -298,10 +298,10 @@ defmodule NSQ.ConsumerTest do end }) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "HTTP message") assert_receive(:handled, 2000) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "HTTP message") assert_receive(:handled, 2000) end @@ -320,8 +320,8 @@ defmodule NSQ.ConsumerTest do end }) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") - HTTP.post("http://127.0.0.1:6761/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "HTTP message") + HTTP.post("http://127.0.0.1:6761/pub?topic=#{@test_topic}", "HTTP message") assert_receive(:handled, 2000) assert_receive(:handled, 2000) @@ -383,8 +383,9 @@ defmodule NSQ.ConsumerTest do end }) - HTTP.post("http://127.0.0.1:6751/mpub?topic=#{@test_topic}", - body: "mpubtest\nmpubtest\nmpubtest" + HTTP.post( + "http://127.0.0.1:6751/mpub?topic=#{@test_topic}", + "mpubtest\nmpubtest\nmpubtest" ) assert_receive(:handled, 2000) @@ -412,7 +413,7 @@ defmodule NSQ.ConsumerTest do }) Enum.map(1..1000, fn _i -> - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "HTTP message") end) assert_receive_n_times(:handled, 1000, 2000) @@ -468,11 +469,11 @@ defmodule NSQ.ConsumerTest do # Send one successful message through so our subsequent timing is more # predictable. - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "HTTP message") assert_receive({:message_finished, _}, 5000) # Our message handler enters into backoff mode and requeues the message. - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "HTTP message") assert_receive({:message_requeued, _}, 2000) assert_receive(:backoff, 1000) @@ -528,7 +529,7 @@ defmodule NSQ.ConsumerTest do assert_receive({:message_finished, _}, 2000) # Send a successful message and leave backoff mode! (I hope!) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "HTTP message") assert_receive({:message_finished, _}, 2000) assert_receive(:resume, 100) cons_state = Cons.get_state(consumer) @@ -558,7 +559,7 @@ defmodule NSQ.ConsumerTest do cons = Cons.get(cons_sup_pid) [conn] = Connections.get(cons) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "HTTP message") refute_receive :handled, 500 cons_state = Cons.get_state(cons) assert ConnInfo.fetch(cons_state, conn, :retry_rdy_pid) == nil @@ -639,10 +640,10 @@ defmodule NSQ.ConsumerTest do end }) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "HTTP message") assert_receive(:handled, 2000) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "HTTP message") assert_receive(:handled, 2000) end @@ -665,7 +666,7 @@ defmodule NSQ.ConsumerTest do end }) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "HTTP message") refute_receive(:handled, 2000) end end @@ -688,12 +689,12 @@ defmodule NSQ.ConsumerTest do assert NSQ.Consumer.starved?(consumer) == false # One message in flight, 50% of last_rdy, not starved - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "HTTP message") assert_receive({:message, _}, 2000) assert NSQ.Consumer.starved?(consumer) == false # Two messages in flight, 100% of last_rdy, __starved__ - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "HTTP message") assert_receive({:message, _}, 2000) assert NSQ.Consumer.starved?(consumer) == true @@ -718,10 +719,10 @@ defmodule NSQ.ConsumerTest do end }) - HTTP.post("http://127.0.0.1:6766/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6766/pub?topic=#{@test_topic}", "HTTP message") assert_receive(:handled, 2000) - HTTP.post("http://127.0.0.1:6766/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6766/pub?topic=#{@test_topic}", "HTTP message") assert_receive(:handled, 2000) end @@ -739,10 +740,10 @@ defmodule NSQ.ConsumerTest do end }) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "HTTP message") assert_receive(:handled, 2000) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", "HTTP message") assert_receive(:handled, 2000) end @@ -767,10 +768,10 @@ defmodule NSQ.ConsumerTest do end }) - HTTP.post("http://127.0.0.1:6766/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6766/pub?topic=#{@test_topic}", "HTTP message") assert_receive(:handled, 2000) - HTTP.post("http://127.0.0.1:6766/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6766/pub?topic=#{@test_topic}", "HTTP message") assert_receive(:handled, 2000) end diff --git a/test/producer_test.exs b/test/producer_test.exs index 97ce803..d5ca297 100644 --- a/test/producer_test.exs +++ b/test/producer_test.exs @@ -8,10 +8,10 @@ defmodule NSQ.ProducerTest do setup do NSQ.Logger.configure(level: :warn) - HTTPotion.post("http://127.0.0.1:6751/topic/delete?topic=#{@test_topic}") - HTTPotion.post("http://127.0.0.1:6761/topic/delete?topic=#{@test_topic}") - HTTPotion.post("http://127.0.0.1:6771/topic/delete?topic=#{@test_topic}") - HTTPotion.post("http://127.0.0.1:6781/topic/delete?topic=#{@test_topic}") + HTTPoison.post("http://127.0.0.1:6751/topic/delete?topic=#{@test_topic}", "") + HTTPoison.post("http://127.0.0.1:6761/topic/delete?topic=#{@test_topic}", "") + HTTPoison.post("http://127.0.0.1:6771/topic/delete?topic=#{@test_topic}", "") + HTTPoison.post("http://127.0.0.1:6781/topic/delete?topic=#{@test_topic}", "") :ok end