Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 75 additions & 45 deletions ex/lib/http/multiserver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Ama.MultiServer do
:socket_ack -> :ok
after 3000 -> throw(:no_socket_passed) end

HTTP.RateLimiter.setup()
state = Map.put(state, :request, %{buf: <<>>})
:ok = :inet.setopts(state.socket, [{:active, :once}])
loop_http(state)
Expand Down Expand Up @@ -48,6 +49,21 @@ defmodule Ama.MultiServer do
state
end

defp client_ip(socket) do
case :inet.peername(socket) do
{:ok, {ip, _port}} -> ip
_ -> :unknown
end
end

defp rate_limited_reply(state, limit, window_ms, content_fn) do
ip = client_ip(state.socket)
case HTTP.RateLimiter.check(ip, limit, window_ms) do
:ok -> content_fn.()
:rate_limited -> quick_reply(state, JSX.encode!(%{error: :rate_limited}), 429)
end
end

defp prometheus_reply(state, content_fn) do
if HTTP.Prometheus.authorized?(state.request.headers) do
:ok = :gen_tcp.send(state.socket, Photon.HTTP.Response.build_cors(state.request, 200, %{"content-type" => "text/plain; version=0.0.4"}, content_fn.()))
Expand Down Expand Up @@ -209,42 +225,52 @@ defmodule Ama.MultiServer do
r.method == "GET" and String.starts_with?(r.path, "/api/chain/tx_events_by_account/") ->
query = r.query && Photon.HTTP.parse_query(r.query)
account = String.replace(r.path, "/api/chain/tx_events_by_account/", "")
filters = %{limit: query[:limit] || "100", offset: query[:offset] || "0", sort: query[:sort] || "asc"}
filters = %{
limit: :erlang.binary_to_integer(filters.limit),
offset: :erlang.binary_to_integer(filters.offset),
sort: case filters.sort do "desc" -> :desc; _ -> :asc end,
cursor: if query[:cursor_b58] do Base58.decode(query.cursor_b58) else query[:cursor] end,
contract: if query[:contract_b58] do Base58.decode(query.contract_b58) else query[:contract] end,
function: query[:function],
}
{cursor, txs} = cond do
query[:type] == "sent" -> API.TX.get_by_address_sent(account, filters)
query[:type] == "recv" -> API.TX.get_by_address_recv(account, filters)
true -> API.TX.get_by_address(account, filters)
raw_limit = query[:limit] || "100"
raw_offset = query[:offset] || "0"
case {Integer.parse(raw_limit), Integer.parse(raw_offset)} do
{{limit, ""}, {offset, ""}} when limit >= 0 and offset >= 0 ->
filters = %{
limit: limit,
offset: offset,
sort: case query[:sort] || "asc" do "desc" -> :desc; _ -> :asc end,
cursor: if query[:cursor_b58] do Base58.decode(query.cursor_b58) else query[:cursor] end,
contract: if query[:contract_b58] do Base58.decode(query.contract_b58) else query[:contract] end,
function: query[:function],
}
{cursor, txs} = cond do
query[:type] == "sent" -> API.TX.get_by_address_sent(account, filters)
query[:type] == "recv" -> API.TX.get_by_address_recv(account, filters)
true -> API.TX.get_by_address(account, filters)
end
result = %{cursor: cursor, txs: txs}
quick_reply(state, result)
_ ->
quick_reply(state, %{error: :invalid_parameters}, 400)
end
result = %{cursor: cursor, txs: txs}
quick_reply(state, result)

r.method == "GET" and String.starts_with?(r.path, "/api/chain/tx_by_filter") ->
query = r.query && Photon.HTTP.parse_query(r.query)

signer = query[:signer] || query[:sender] || query[:pk]
arg0 = query[:arg0] || query[:receiver]

filters = %{
signer: signer && Base58.decode(signer),
arg0: arg0 && Base58.decode(arg0),
contract: if query[:contract_b58] do Base58.decode(query.contract_b58) else query[:contract] end,
function: query[:function],

limit: :erlang.binary_to_integer(query[:limit] || "100"),
sort: case query[:sort] do "desc" -> :desc; _ -> :asc end,
cursor: query[:cursor] && Base58.decode(query.cursor),
}
{cursor, txs} = API.TX.get_by_filter(filters)
result = %{cursor: cursor, txs: txs}
quick_reply(state, result)
case Integer.parse(query[:limit] || "100") do
{limit, ""} when limit >= 0 ->
filters = %{
signer: signer && Base58.decode(signer),
arg0: arg0 && Base58.decode(arg0),
contract: if query[:contract_b58] do Base58.decode(query.contract_b58) else query[:contract] end,
function: query[:function],
limit: limit,
sort: case query[:sort] do "desc" -> :desc; _ -> :asc end,
cursor: query[:cursor] && Base58.decode(query.cursor),
}
{cursor, txs} = API.TX.get_by_filter(filters)
result = %{cursor: cursor, txs: txs}
quick_reply(state, result)
_ ->
quick_reply(state, %{error: :invalid_parameters}, 400)
end

r.method == "GET" and String.starts_with?(r.path, "/api/chain/txs_in_entry/") ->
entry_hash = String.replace(r.path, "/api/chain/txs_in_entry/", "")
Expand Down Expand Up @@ -305,25 +331,29 @@ defmodule Ama.MultiServer do
quick_reply(%{state|request: r}, result)

testnet and r.method == "GET" and r.path == "/api/upow/seed" ->
epoch = DB.Chain.epoch()
segment_vr_hash = DB.Chain.segment_vr_hash()
nonce = :crypto.strong_rand_bytes(12)
%{pk: pk, pop: pop} = Application.fetch_env!(:ama, :keys) |> hd()
seed = <<epoch::32-little, segment_vr_hash::32-binary,
pk::48-binary, pop::96-binary, pk::binary, nonce::12-binary>>
quick_reply(state, seed)
rate_limited_reply(state, 10, 60_000, fn ->
epoch = DB.Chain.epoch()
segment_vr_hash = DB.Chain.segment_vr_hash()
nonce = :crypto.strong_rand_bytes(12)
%{pk: pk, pop: pop} = Application.fetch_env!(:ama, :keys) |> hd()
seed = <<epoch::32-little, segment_vr_hash::32-binary,
pk::48-binary, pop::96-binary, pk::binary, nonce::12-binary>>
quick_reply(state, seed)
end)

testnet and r.method == "GET" and r.path == "/api/upow/seed_with_matrix_a_b" ->
epoch = DB.Chain.epoch()
segment_vr_hash = DB.Chain.segment_vr_hash()
nonce = :crypto.strong_rand_bytes(12)
%{pk: pk, pop: pop} = Application.fetch_env!(:ama, :keys) |> hd()
seed = <<epoch::32-little, segment_vr_hash::32-binary,
pk::48-binary, pop::96-binary, pk::binary, nonce::12-binary>>
b = Blake3.new()
Blake3.update(b, seed)
matrix_a_b = Blake3.finalize_xof(b, 16*50240 + 50240*16)
quick_reply(state, seed <> matrix_a_b)
rate_limited_reply(state, 2, 60_000, fn ->
epoch = DB.Chain.epoch()
segment_vr_hash = DB.Chain.segment_vr_hash()
nonce = :crypto.strong_rand_bytes(12)
%{pk: pk, pop: pop} = Application.fetch_env!(:ama, :keys) |> hd()
seed = <<epoch::32-little, segment_vr_hash::32-binary,
pk::48-binary, pop::96-binary, pk::binary, nonce::12-binary>>
b = Blake3.new()
Blake3.update(b, seed)
matrix_a_b = Blake3.finalize_xof(b, 16*50240 + 50240*16)
quick_reply(state, seed <> matrix_a_b)
end)

testnet and r.method == "GET" and String.starts_with?(r.path, "/api/upow/validate/") ->
sol = String.replace(r.path, "/api/upow/validate/", "") |> Base58.decode()
Expand Down
26 changes: 26 additions & 0 deletions ex/lib/http/rate_limiter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule HTTP.RateLimiter do
@table :http_rate_limiter

def setup do
if :ets.whereis(@table) == :undefined do
:ets.new(@table, [:named_table, :public, :set])
end
end

# Returns :ok or :rate_limited
def check(ip, limit, window_ms) do
now = System.monotonic_time(:millisecond)
case :ets.lookup(@table, ip) do
[{^ip, count, window_start}] when now - window_start < window_ms ->
if count >= limit do
:rate_limited
else
:ets.insert(@table, {ip, count + 1, window_start})
:ok
end
_ ->
:ets.insert(@table, {ip, 1, now})
:ok
end
end
end