From 75290cbfbd0f93cf3ecec5f44c4624b8c8601c51 Mon Sep 17 00:00:00 2001
From: Egor Kislitsyn <egor@kislitsyn.com>
Date: Thu, 26 Sep 2019 18:49:57 +0700
Subject: [PATCH 1/4] Add Pleroma.JobQueueMonitor

---
 lib/pleroma/application.ex           |   1 +
 lib/pleroma/healthcheck.ex           |   8 ++
 lib/pleroma/job_queue_monitor.ex     | 115 +++++++++++++++++++++++++++
 lib/pleroma/workers/worker_helper.ex |   1 +
 test/healthcheck_test.exs            |   9 ++-
 5 files changed, 133 insertions(+), 1 deletion(-)
 create mode 100644 lib/pleroma/job_queue_monitor.ex

diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index 7aec2c545..3e21d4403 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -42,6 +42,7 @@ defmodule Pleroma.Application do
         hackney_pool_children() ++
         [
           Pleroma.Stats,
+          Pleroma.JobQueueMonitor,
           {Oban, Pleroma.Config.get(Oban)}
         ] ++
         task_children(@env) ++
diff --git a/lib/pleroma/healthcheck.ex b/lib/pleroma/healthcheck.ex
index 977b78c26..fc2129815 100644
--- a/lib/pleroma/healthcheck.ex
+++ b/lib/pleroma/healthcheck.ex
@@ -14,6 +14,7 @@ defmodule Pleroma.Healthcheck do
             active: 0,
             idle: 0,
             memory_used: 0,
+            job_queue_stats: nil,
             healthy: true
 
   @type t :: %__MODULE__{
@@ -21,6 +22,7 @@ defmodule Pleroma.Healthcheck do
           active: non_neg_integer(),
           idle: non_neg_integer(),
           memory_used: number(),
+          job_queue_stats: map(),
           healthy: boolean()
         }
 
@@ -30,6 +32,7 @@ defmodule Pleroma.Healthcheck do
       memory_used: Float.round(:erlang.memory(:total) / 1024 / 1024, 2)
     }
     |> assign_db_info()
+    |> assign_job_queue_stats()
     |> check_health()
   end
 
@@ -55,6 +58,11 @@ defmodule Pleroma.Healthcheck do
     Map.merge(healthcheck, db_info)
   end
 
+  defp assign_job_queue_stats(healthcheck) do
+    stats = Pleroma.JobQueueMonitor.stats()
+    Map.put(healthcheck, :job_queue_stats, stats)
+  end
+
   @spec check_health(Healthcheck.t()) :: Healthcheck.t()
   def check_health(%{pool_size: pool_size, active: active} = check)
       when active >= pool_size do
diff --git a/lib/pleroma/job_queue_monitor.ex b/lib/pleroma/job_queue_monitor.ex
new file mode 100644
index 000000000..685ba2ead
--- /dev/null
+++ b/lib/pleroma/job_queue_monitor.ex
@@ -0,0 +1,115 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.JobQueueMonitor do
+  use GenServer
+
+  @initial_state %{workers: %{}, queues: %{}, processed_jobs: 0, enqueued: 0}
+  @queue %{processed_jobs: 0, success: 0, failure: 0, enqueued: 0}
+  @operation %{processed_jobs: 0, success: 0, failure: 0, enqueued: 0}
+
+  def start_link(_) do
+    GenServer.start_link(__MODULE__, @initial_state, name: __MODULE__)
+  end
+
+  @impl true
+  def init(state) do
+    :telemetry.attach("oban-monitor-failure", [:oban, :failure], &handle_event/4, nil)
+    :telemetry.attach("oban-monitor-success", [:oban, :success], &handle_event/4, nil)
+
+    {:ok, state}
+  end
+
+  def stats do
+    GenServer.call(__MODULE__, :stats)
+  end
+
+  def enqueue({:ok, job}) do
+    meta = Map.take(job, [:args, :queue, :worker])
+    GenServer.cast(__MODULE__, {:process_enqueue, meta})
+
+    {:ok, job}
+  end
+
+  def enqueue(result), do: result
+
+  def handle_event([:oban, status], %{duration: duration}, meta, _) do
+    GenServer.cast(__MODULE__, {:process_event, status, duration, meta})
+  end
+
+  @impl true
+  def handle_call(:stats, _from, state) do
+    {:reply, state, state}
+  end
+
+  def handle_cast({:process_enqueue, meta}, state) do
+    state =
+      state
+      |> Map.update!(:workers, fn workers ->
+        workers
+        |> Map.put_new(meta.worker, %{})
+        |> Map.update!(meta.worker, &update_worker(&1, :enqueue, meta))
+      end)
+      |> Map.update!(:queues, fn workers ->
+        workers
+        |> Map.put_new(meta.queue, @queue)
+        |> Map.update!(meta.queue, fn queue -> Map.update!(queue, :enqueued, &(&1 + 1)) end)
+      end)
+      |> Map.update!(:enqueued, &(&1 + 1))
+
+    {:noreply, state}
+  end
+
+  @impl true
+  def handle_cast({:process_event, status, duration, meta}, state) do
+    state =
+      state
+      |> Map.update!(:workers, fn workers ->
+        workers
+        |> Map.put_new(meta.worker, %{})
+        |> Map.update!(meta.worker, &update_worker(&1, status, meta, duration))
+      end)
+      |> Map.update!(:queues, fn workers ->
+        workers
+        |> Map.put_new(meta.queue, @queue)
+        |> Map.update!(meta.queue, &update_queue(&1, status, meta, duration))
+      end)
+      |> Map.update!(:processed_jobs, &(&1 + 1))
+      |> decr_enqueued()
+
+    {:noreply, state}
+  end
+
+  defp update_worker(worker, status, meta, duration \\ 0) do
+    worker
+    |> Map.put_new(meta.args["op"], @operation)
+    |> Map.update!(meta.args["op"], &update_op(&1, status, meta, duration))
+  end
+
+  defp update_op(op, :enqueue, _meta, _duration) do
+    op
+    |> Map.update!(:enqueued, &(&1 + 1))
+  end
+
+  defp update_op(op, status, _meta, _duration) do
+    op
+    |> Map.update!(:processed_jobs, &(&1 + 1))
+    |> Map.update!(status, &(&1 + 1))
+    |> decr_enqueued()
+  end
+
+  defp update_queue(queue, status, _meta, _duration) do
+    queue
+    |> Map.update!(:processed_jobs, &(&1 + 1))
+    |> Map.update!(status, &(&1 + 1))
+    |> decr_enqueued()
+  end
+
+  defp decr_enqueued(map) do
+    Map.update!(map, :enqueued, fn
+      0 -> 0
+      enqueued -> enqueued - 1
+    end)
+  end
+end
diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex
index 358efa14a..a43ce8bc0 100644
--- a/lib/pleroma/workers/worker_helper.ex
+++ b/lib/pleroma/workers/worker_helper.ex
@@ -40,6 +40,7 @@ defmodule Pleroma.Workers.WorkerHelper do
         unquote(caller_module)
         |> apply(:new, [params, worker_args])
         |> Pleroma.Repo.insert()
+        |> Pleroma.JobQueueMonitor.enqueue()
       end
     end
   end
diff --git a/test/healthcheck_test.exs b/test/healthcheck_test.exs
index 6bb8d5b7f..66d5026ff 100644
--- a/test/healthcheck_test.exs
+++ b/test/healthcheck_test.exs
@@ -9,7 +9,14 @@ defmodule Pleroma.HealthcheckTest do
   test "system_info/0" do
     result = Healthcheck.system_info() |> Map.from_struct()
 
-    assert Map.keys(result) == [:active, :healthy, :idle, :memory_used, :pool_size]
+    assert Map.keys(result) == [
+             :active,
+             :healthy,
+             :idle,
+             :job_queue_stats,
+             :memory_used,
+             :pool_size
+           ]
   end
 
   describe "check_health/1" do

From 26693292f4b2062504fc9a24e824a6f56cb6b555 Mon Sep 17 00:00:00 2001
From: Egor Kislitsyn <egor@kislitsyn.com>
Date: Wed, 2 Oct 2019 14:50:25 +0700
Subject: [PATCH 2/4] Remove `:enqueued` counter

---
 lib/pleroma/job_queue_monitor.ex     | 45 +++-------------------------
 lib/pleroma/workers/worker_helper.ex |  1 -
 2 files changed, 4 insertions(+), 42 deletions(-)

diff --git a/lib/pleroma/job_queue_monitor.ex b/lib/pleroma/job_queue_monitor.ex
index 685ba2ead..3feea8381 100644
--- a/lib/pleroma/job_queue_monitor.ex
+++ b/lib/pleroma/job_queue_monitor.ex
@@ -5,9 +5,9 @@
 defmodule Pleroma.JobQueueMonitor do
   use GenServer
 
-  @initial_state %{workers: %{}, queues: %{}, processed_jobs: 0, enqueued: 0}
-  @queue %{processed_jobs: 0, success: 0, failure: 0, enqueued: 0}
-  @operation %{processed_jobs: 0, success: 0, failure: 0, enqueued: 0}
+  @initial_state %{workers: %{}, queues: %{}, processed_jobs: 0}
+  @queue %{processed_jobs: 0, success: 0, failure: 0}
+  @operation %{processed_jobs: 0, success: 0, failure: 0}
 
   def start_link(_) do
     GenServer.start_link(__MODULE__, @initial_state, name: __MODULE__)
@@ -25,15 +25,6 @@ defmodule Pleroma.JobQueueMonitor do
     GenServer.call(__MODULE__, :stats)
   end
 
-  def enqueue({:ok, job}) do
-    meta = Map.take(job, [:args, :queue, :worker])
-    GenServer.cast(__MODULE__, {:process_enqueue, meta})
-
-    {:ok, job}
-  end
-
-  def enqueue(result), do: result
-
   def handle_event([:oban, status], %{duration: duration}, meta, _) do
     GenServer.cast(__MODULE__, {:process_event, status, duration, meta})
   end
@@ -43,24 +34,6 @@ defmodule Pleroma.JobQueueMonitor do
     {:reply, state, state}
   end
 
-  def handle_cast({:process_enqueue, meta}, state) do
-    state =
-      state
-      |> Map.update!(:workers, fn workers ->
-        workers
-        |> Map.put_new(meta.worker, %{})
-        |> Map.update!(meta.worker, &update_worker(&1, :enqueue, meta))
-      end)
-      |> Map.update!(:queues, fn workers ->
-        workers
-        |> Map.put_new(meta.queue, @queue)
-        |> Map.update!(meta.queue, fn queue -> Map.update!(queue, :enqueued, &(&1 + 1)) end)
-      end)
-      |> Map.update!(:enqueued, &(&1 + 1))
-
-    {:noreply, state}
-  end
-
   @impl true
   def handle_cast({:process_event, status, duration, meta}, state) do
     state =
@@ -76,12 +49,11 @@ defmodule Pleroma.JobQueueMonitor do
         |> Map.update!(meta.queue, &update_queue(&1, status, meta, duration))
       end)
       |> Map.update!(:processed_jobs, &(&1 + 1))
-      |> decr_enqueued()
 
     {:noreply, state}
   end
 
-  defp update_worker(worker, status, meta, duration \\ 0) do
+  defp update_worker(worker, status, meta, duration) do
     worker
     |> Map.put_new(meta.args["op"], @operation)
     |> Map.update!(meta.args["op"], &update_op(&1, status, meta, duration))
@@ -96,20 +68,11 @@ defmodule Pleroma.JobQueueMonitor do
     op
     |> Map.update!(:processed_jobs, &(&1 + 1))
     |> Map.update!(status, &(&1 + 1))
-    |> decr_enqueued()
   end
 
   defp update_queue(queue, status, _meta, _duration) do
     queue
     |> Map.update!(:processed_jobs, &(&1 + 1))
     |> Map.update!(status, &(&1 + 1))
-    |> decr_enqueued()
-  end
-
-  defp decr_enqueued(map) do
-    Map.update!(map, :enqueued, fn
-      0 -> 0
-      enqueued -> enqueued - 1
-    end)
   end
 end
diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex
index a43ce8bc0..358efa14a 100644
--- a/lib/pleroma/workers/worker_helper.ex
+++ b/lib/pleroma/workers/worker_helper.ex
@@ -40,7 +40,6 @@ defmodule Pleroma.Workers.WorkerHelper do
         unquote(caller_module)
         |> apply(:new, [params, worker_args])
         |> Pleroma.Repo.insert()
-        |> Pleroma.JobQueueMonitor.enqueue()
       end
     end
   end

From 93f966ea4bb2b4d551cb3e248150809554deddc8 Mon Sep 17 00:00:00 2001
From: Egor Kislitsyn <egor@kislitsyn.com>
Date: Wed, 2 Oct 2019 14:51:30 +0700
Subject: [PATCH 3/4] Update CHANGELOG and pleroma_api.md

---
 CHANGELOG.md            | 1 +
 docs/api/pleroma_api.md | 9 +++++----
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1a76e6cf8..0a88f9d75 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
 ## [Unreleased]
 ### Added
 - Refreshing poll results for remote polls
+- Job queue stats to the healthcheck page
 - Admin API: Add ability to require password reset
 
 ### Changed
diff --git a/docs/api/pleroma_api.md b/docs/api/pleroma_api.md
index a469ddfbf..dfd6623ec 100644
--- a/docs/api/pleroma_api.md
+++ b/docs/api/pleroma_api.md
@@ -317,7 +317,8 @@ See [Admin-API](Admin-API.md)
   "active": 0, # active processes
   "idle": 0, # idle processes
   "memory_used": 0.00, # Memory used
-  "healthy": true # Instance state
+  "healthy": true, # Instance state
+  "job_queue_stats": {} # Job queue stats
 }
 ```
 
@@ -391,7 +392,7 @@ The status posting endpoint takes an additional parameter, `in_reply_to_conversa
 ### Update a file in a custom emoji pack
 * Method `POST`
 * Authentication: required
-* Params: 
+* Params:
     * if the `action` is `add`, adds an emoji named `shortcode` to the pack `pack_name`,
       that means that the emoji file needs to be uploaded with the request
       (thus requiring it to be a multipart request) and be named `file`.
@@ -408,7 +409,7 @@ The status posting endpoint takes an additional parameter, `in_reply_to_conversa
 ### Updates (replaces) pack metadata
 * Method `POST`
 * Authentication: required
-* Params: 
+* Params:
   * `new_data`: new metadata to replace the old one
 * Response: JSON, updated "metadata" section of the pack and 200 status or 400 if there was a
   problem with the new metadata (the error is specified in the "error" part of the response JSON)
@@ -417,7 +418,7 @@ The status posting endpoint takes an additional parameter, `in_reply_to_conversa
 ### Requests the instance to download the pack from another instance
 * Method `POST`
 * Authentication: required
-* Params: 
+* Params:
   * `instance_address`: the address of the instance to download from
   * `pack_name`: the pack to download from that instance
 * Response: JSON, "ok" and 200 status if the pack was downloaded, or 500 if there were

From 0fc29deba06b6a897f3534ce68abfdadcab12a6b Mon Sep 17 00:00:00 2001
From: Egor Kislitsyn <egor@kislitsyn.com>
Date: Wed, 2 Oct 2019 15:24:21 +0700
Subject: [PATCH 4/4] Add tests for Pleroma.JobQueueMonitor

---
 test/job_queue_monitor_test.exs | 70 +++++++++++++++++++++++++++++++++
 1 file changed, 70 insertions(+)
 create mode 100644 test/job_queue_monitor_test.exs

diff --git a/test/job_queue_monitor_test.exs b/test/job_queue_monitor_test.exs
new file mode 100644
index 000000000..17c6f3246
--- /dev/null
+++ b/test/job_queue_monitor_test.exs
@@ -0,0 +1,70 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.JobQueueMonitorTest do
+  use ExUnit.Case, async: true
+
+  alias Pleroma.JobQueueMonitor
+
+  @success {:process_event, :success, 1337,
+            %{
+              args: %{"op" => "refresh_subscriptions"},
+              attempt: 1,
+              id: 339,
+              max_attempts: 5,
+              queue: "federator_outgoing",
+              worker: "Pleroma.Workers.SubscriberWorker"
+            }}
+
+  @failure {:process_event, :failure, 22_521_134,
+            %{
+              args: %{"op" => "force_password_reset", "user_id" => "9nJG6n6Nbu7tj9GJX6"},
+              attempt: 1,
+              error: %RuntimeError{message: "oops"},
+              id: 345,
+              kind: :exception,
+              max_attempts: 1,
+              queue: "background",
+              stack: [
+                {Pleroma.Workers.BackgroundWorker, :perform, 2,
+                 [file: 'lib/pleroma/workers/background_worker.ex', line: 31]},
+                {Oban.Queue.Executor, :safe_call, 1,
+                 [file: 'lib/oban/queue/executor.ex', line: 42]},
+                {:timer, :tc, 3, [file: 'timer.erl', line: 197]},
+                {Oban.Queue.Executor, :call, 2, [file: 'lib/oban/queue/executor.ex', line: 23]},
+                {Task.Supervised, :invoke_mfa, 2, [file: 'lib/task/supervised.ex', line: 90]},
+                {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}
+              ],
+              worker: "Pleroma.Workers.BackgroundWorker"
+            }}
+
+  test "stats/0" do
+    assert %{processed_jobs: _, queues: _, workers: _} = JobQueueMonitor.stats()
+  end
+
+  test "handle_cast/2" do
+    state = %{workers: %{}, queues: %{}, processed_jobs: 0}
+
+    assert {:noreply, state} = JobQueueMonitor.handle_cast(@success, state)
+    assert {:noreply, state} = JobQueueMonitor.handle_cast(@failure, state)
+    assert {:noreply, state} = JobQueueMonitor.handle_cast(@success, state)
+    assert {:noreply, state} = JobQueueMonitor.handle_cast(@failure, state)
+
+    assert state == %{
+             processed_jobs: 4,
+             queues: %{
+               "background" => %{failure: 2, processed_jobs: 2, success: 0},
+               "federator_outgoing" => %{failure: 0, processed_jobs: 2, success: 2}
+             },
+             workers: %{
+               "Pleroma.Workers.BackgroundWorker" => %{
+                 "force_password_reset" => %{failure: 2, processed_jobs: 2, success: 0}
+               },
+               "Pleroma.Workers.SubscriberWorker" => %{
+                 "refresh_subscriptions" => %{failure: 0, processed_jobs: 2, success: 2}
+               }
+             }
+           }
+  end
+end