среда, 22 апреля 2015 г.

Server Sent Events в Ruby on Rails. Прохождение

В Rails начиная в 4 версии есть поддержка так называемого Live Streaming, позволяющего реализовать Server-Sent Events. По сути своей это костыль, позволяющий держать TCP-соединение открытым не завершая HTTP-транзакцию так что сервер всегда может инициировать отправку данных клиенту по установленному клиентом соединению. Инициируется соединение клиентом как HTTP-запрос, но в ответ сервер отправляет порцию хедеров с Content-Type: text/event-stream и не закрывает соединение. Такой простой механизм лежит в основе Server-Sent Events.
В сети есть несколько тьюториалов по использовании этого в RoR приложении, но они весьма простые и не затрагивают некоторые важные подводные камни, с которыми я столкнулся при написании feature branches build server - приложения подобного continuous integration, но с особенностью не важной для данной записи. О неудобствах, подводных камнях и моём их решении ниже.


Первое неудобство - это неприспособленность RoR для сценария когда сервер отдаёт клиенту данные без запроса.
Обычный сценарий:
  1. запрос клиента (контроллер)
  2. обработка (модель)
  3. ответ клиенту (контроллер)
В случае с SSE:
  1. запрос клиента (контроллер)
  2. обработка (контроллер/модель)
  3. ожидание данных (модель)
  4. отправка данных клиенту (контроллер)
  5. goto 2 unless error
В классическом случае контроллер получает данные у модели и возвращает клиенту. В случае с SSE контроллер принимает запрос клиента (в отдельном потоке, но об этом позже) и ждёт пока на уровне модели появятся данные, которые можно отправить этому клиенту. Это плохо сочетается с MVC.

Моя реализация такого механизма выглядит примерно так.
Отдельный контроллер, используемый только для SSE:
class LiveUpdatesController < ApplicationController
  include ActionController::Live
 
  def build_jobs
    Rails.logger.info "Starting SSE for new client"
    response.headers['Content-Type'] = 'text/event-stream'
    sse = SSE.new(response.stream, retry: 60, event: "update_build_jobs")
    build_jobs_queue = QueueWithTimeout.new
    Timeout::timeout(14400) do # kick the client after 4 hours
      BuildJob.on_change(build_jobs_queue) do
        begin
          build_job = build_jobs_queue.pop_with_timeout(30) # every 30 seconds send keep live packets to determine disconnects
          sse.write(params_for_build_job(build_job))
        rescue ThreadError # pop timeout - send keep alive
          sse.write params_for_keep_alive
          retry
        rescue => err
          raise err
        end
      end
    end
  rescue ClientDisconnected, Timeout::Error => err
    Rails.logger.warn err.to_s
  ensure
    sse.close
    BuildJob.on_change_cleanup(build_jobs_queue)
    Rails.logger.info "Closed SSE stream"
  end
Экшн build_job выполняется в отдельном потоке для каждого клиента.

#1 Невозможность определить когда клиент отключился до попытки отправить ему данные

 

Например, пользователь закрыл вкладку вместе с SSE соединением, но наш сервер об этом не узнает пока не попытается что-то отправить клиенту. Если отправлять нечего, то мёртвое соединение (а вместе с ним и поток) будет висеть до бесконечности. Эксепшен ClientDisconnected бросается только в таком случае. Из-за этого приходится каждые 30 секунд слать клиенту "keep_alive" просто для возможности получить ClientDisconnected и завершить поток когда клиент отключился.

Это давно известная проблема (https://github.com/rails/rails/issues/10989) и именно так её и костыляют. Так же на всякий случай сделан принудительный кик клиента через 4 часа. Просто для дополнительной профилактики мёртвых клиентов. Если клиент жив, то он переподключится (параметр retry в конструкторе SSE есть таймаут переподключения в секундах).

Модель BuildJob, в которой происходят обновления данных о которых надо сообщать клиентам:
class BuildJob < ActiveRecord::Base
  after_save :notify
 
  @@notify_queues = []
  @@notify_queues_mutex = Mutex.new
  class << self
    def on_change(queue)
      @@notify_queues_mutex.synchronize do
        @@notify_queues << queue
      end
      loop do
        @@notify_queues.each do
          yield
        end
      end
    end
  end
 
  def self.on_change_cleanup(queue)
    @@notify_queues_mutex.synchronize do
      @@notify_queues.delete(queue)
    end
  end

  private
  
    def notify
      @@notify_queues.each do |q|
        q.push self
      end
    end
   
end
Обычно используют Redis или иные PubSub решения (в PostgreSQL даже есть), но для меня это было бы оверкиллом поэтому решил сделать так. Контроллер создаёт очередь, передаёт её в модель и дальше блокирует поток пока в ней появятся данные. А данные там появятся когда в модели сработает after_save коллбэк. В нём все очереди, переданные контроллерами заполняются новыми данными после чего контроллеры их получают из очередей и отдают клиентам.

Для реализации keep_alive потребовалась очередь с таймаутом. Её реализация тривиальна и не приводится для краткости (полный пример можно найти тут https://bitbucket.org/antlabs_dev/fbbs2).

#2 Особенности работы в Google Chrome

 

На клиентской стороне происходит нижеследующее:
connect_sse = ->
  console.log "Trying to start SSE..."
  if(gon? && gon.build_jobs_live_updates_path?)
    path = gon.build_jobs_live_updates_path
    console.log "SSE path " + path
    if window.event_source
      console.log "Stopping SSE before restarting"
      window.event_source.close()
    source = new EventSource(path)
    window.event_source = source
   
    $(document).on 'page:before-change', ->
       console.log "Stopping SSE on page change"
       source.close()
   
    source.onerror = ->
      console.log "Error occured while listening SSE"
   
    source.addEventListener "update_build_jobs", (event) ->
      if event.data == 'keep_alive'
        console.log 'keep_alive'
      else
        onevent event
  else
    console.log "Error starting SSE" 

onevent = (event) ->
  json = JSON.parse(event.data)
  console.log json
  refresh_page json
 
refresh_page = (json) ->
  if window.event_source
    console.log "Stopping SSE before reloading page contents"
    window.event_source.close()
  #disable page scrolling to top after loading page content
  Turbolinks.enableTransitionCache(true)
  # pass current page url to visit method
  Turbolinks.visit(location.toString())
  #enable page scroll reset in case user clicks other link
  Turbolinks.enableTransitionCache(false)
   
ready = ->
  setTimeout (-> connect_sse()), 1000
       
$(document).ready(ready)
$(document).on('page:load', ready) # with turbolinks it causes multiple connection sse when walking across pages with sse
Тут используется gon (https://github.com/gazay/gon) для передачи URL в JS. Если у Вас URL для SSE постоянный, то gon не нужен.
Оказалось что в Chrome соедиенения SSE не закрываются при обновлении страницы с помощью Turbolinks (Turbolinks.visit)или просто при переходах по ссылкам. Таким образом быстро достигается лимит в 6 подключений на 1 домен и всё напрочь перестаёт работать, могает только закрытие вкладки и повторное открытие. И даже закрытие соединения в page:before-change не помогает. Возможно не повезло только мне, но пришлось сохранять объект EventSource в window и принудительно закрывать соединение на нём если он существует перед открытием нового. 
 
И в заключении о настройке nginx для работы с puma и SSE.

upstream puma {
  server unix:///srv/www/fbbs2/control/tmp/sockets/puma.sock;
}

server {
  listen 80;
  server_name example.com;

  root /srv/www/fbbs2/control/public;

  location ^~ /assets/ {
    gzip_static on;
    expires max;
    add_header Cache-Control public;
  }

  try_files $uri/index.html $uri @puma;
  location @puma {
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_set_header Host $http_host;
    proxy_redirect off;

    proxy_pass http://puma;
   
    proxy_set_header Connection '';
    proxy_http_version 1.1;
    chunked_transfer_encoding off;
    #proxy_set_header X-Accel-Buffering no;
  }

  error_page 500 502 503 504 /500.html;
  client_max_body_size 1000M;
  keepalive_timeout 10;
}
Ключеые строки:
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_set_header Host $http_host;
    proxy_redirect off;


Это необходимо для работы SSE.

Так же незабываем что необходим многопоточный севрер (например, puma), на каждого SSE клиента создаётся поток (соответственно нужно запускать сервер с макмиальным кол-вом потоков не менее ожидаемого максимального количества одновременных плиентов. На каждый поток, работающий с базой будет использовано отдельное подключение к базе, поэтому незабываем устанавливать pool в database.yml равным максимальному количеству потоков.

Полные исходники проекта с SSE https://bitbucket.org/antlabs_dev/fbbs2






Комментариев нет:

Отправить комментарий