Powered by AppSignal & Oban Pro
Would you like to see your link here? Contact us

AI Gateway

livebooks/cloudflare/ai_gateway.livemd

AI Gateway

Mix.install([
  {:aws, "~> 1.0"},
  {:hackney, "~> 1.20"},
  {:kino, "~> 0.14"},
  {:req, "~> 0.5"}
])

Call Bedrock with AWS.Bedrock module

aws_access_key_id_input = Kino.Input.password("AWS ACCESS_KEY_ID")
aws_secret_access_key_input = Kino.Input.password("AWS SECRET_ACCESS_KEY")
aws_region_input = Kino.Input.text("AWS REGION")

cf_account_id_input = Kino.Input.password("CLOUDFLARE ACCOUNT_ID")
cf_gateway_name_input = Kino.Input.text("CLOUDFLARE GATEWAY_NAME")

[
  aws_access_key_id_input,
  aws_secret_access_key_input,
  aws_region_input,
  cf_account_id_input,
  cf_gateway_name_input
]
|> Kino.Layout.grid(columns: 3)
aws_client =
  AWS.Client.create(
    Kino.Input.read(aws_access_key_id_input),
    Kino.Input.read(aws_secret_access_key_input),
    Kino.Input.read(aws_region_input)
  )
model_id = "anthropic.claude-3-5-sonnet-20240620-v1:0"

input = "あなたの名前を教えてください。"

payload = %{
  "messages" => [%{
    "role" => "user",
    "content" => [%{"text" => input}]
  }]
}

options = [recv_timeout: 60_000]
result =
  aws_client
  |> AWS.BedrockRuntime.converse(model_id, payload, options)
  |> elem(1)
result
|> Map.get("output")
|> Map.get("message")
|> Map.get("content")
|> hd()
|> Map.get("text")
|> Kino.Markdown.new()

Call Bedrock with AWS.Client.request

aws_region = Kino.Input.read(aws_region_input)

host = "bedrock-runtime.#{aws_region}.amazonaws.com"

headers = [
  {"Host", host},
  {"Content-Type", "application/json"}
]

encoded_path = "/model/#{AWS.Util.encode_uri(model_id)}/converse"

url = "https://#{host}#{encoded_path}"
now = NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)
encoded_payload = AWS.Client.encode!(aws_client, payload, :json)
siged_headers =
  AWS.Signature.sign_v4(
    %{aws_client | service: "bedrock"}, 
    now,
    :post,
    url,
    headers,
    encoded_payload
  )
AWS.Client.request(aws_client, :post, url, encoded_payload, siged_headers, options)

Call Bedrock with AI Gateway

cf_account_id = Kino.Input.read(cf_account_id_input)
cf_gateway_name = Kino.Input.read(cf_gateway_name_input)
cf_host = "gateway.ai.cloudflare.com"

gw_url =
  "https://#{cf_host}/v1/#{cf_account_id}/#{cf_gateway_name}/aws-bedrock/bedrock-runtime/#{aws_region}#{encoded_path}"
gw_header =
  siged_headers
  |> Enum.map(fn {key, value} ->
    if key == "Host" do
      {key, cf_host}
    else
      {key, value}
    end
  end)
result =
  Req.new(url: gw_url, headers: gw_header, body: encoded_payload)
  |> Req.post!(connect_options: [timeout: 60_000])
  |> Map.get(:body)

Define the AI Gateway module

defmodule AIGateway do
  @cf_host "gateway.ai.cloudflare.com"
  
  def invoke(aws_client, model_id, cf_account_id, cf_gateway_name, input) do
    payload =
      %{
        "messages" => [%{
          "role" => "user",
          "content" => [%{"text" => input}]
        }]
      }

    host = "bedrock-runtime.#{aws_client.region}.amazonaws.com"
    
    headers = [
      {"Host", host},
      {"Content-Type", "application/json"}
    ]
    
    encoded_path = "/model/#{AWS.Util.encode_uri(model_id)}/converse"
    encoded_payload = AWS.Client.encode!(aws_client, payload, :json)

    gw_headers = sign_headers(
      aws_client,
      "https://#{host}#{encoded_path}",
      headers,
      encoded_payload
    )
    
    gw_url =
      "https://#{@cf_host}/v1/#{cf_account_id}/#{cf_gateway_name}/aws-bedrock/bedrock-runtime/#{aws_client.region}#{encoded_path}"

    Req.new(url: gw_url, headers: gw_headers, body: encoded_payload)
    |> Req.post!(connect_options: [timeout: 60_000])
    |> Map.get(:body)
  end

  defp sign_headers(aws_client, url, headers, payload) do
    now = NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)

    %{aws_client | service: "bedrock"}
    |> AWS.Signature.sign_v4(
      now,
      :post,
      url,
      headers,
      payload
    )
    |> Enum.map(fn {key, value} ->
      if key == "Host" do
        {key, @cf_host}
      else
        {key, value}
      end
    end)
  end
end
instruction_input = Kino.Input.textarea("INSTRUCTION")
instruction = Kino.Input.read(instruction_input)

aws_client
|> AIGateway.invoke(model_id, cf_account_id, cf_gateway_name, instruction)
|> Map.get("output")
|> Map.get("message")
|> Map.get("content")
|> hd()
|> Map.get("text")
|> Kino.Markdown.new()

Get logs

cf_token_input = Kino.Input.password("CLOUDFLARE TOKEN")
cf_api_url =
  "https://api.cloudflare.com/client/v4/accounts/#{cf_account_id}/ai-gateway/gateways/#{cf_gateway_name}/logs"

cf_token = Kino.Input.read(cf_token_input)

cf_api_headers = [
  {"Authorization", "Bearer #{cf_token}"},
  {"Content-Type", "application/json"}
]

result =
  Req.new(url: cf_api_url, headers: cf_api_headers)
  |> Req.get!()
  |> Map.get(:body)