В Rails начиная в 4 версии есть поддержка так называемого Live Streaming, позволяющего реализовать Server-Sent Events. По сути своей это костыль, позволяющий держать TCP-соединение открытым не завершая HTTP-транзакцию так что сервер всегда может инициировать отправку данных клиенту по установленному клиентом соединению. Инициируется соединение клиентом как HTTP-запрос, но в ответ сервер отправляет порцию хедеров с Content-Type: text/event-stream и не закрывает соединение. Такой простой механизм лежит в основе Server-Sent Events.
В сети есть несколько тьюториалов по использовании этого в RoR приложении, но они весьма простые и не затрагивают некоторые важные подводные камни, с которыми я столкнулся при написании feature branches build server - приложения подобного continuous integration, но с особенностью не важной для данной записи. О неудобствах, подводных камнях и моём их решении ниже.
Первое неудобство - это неприспособленность RoR для сценария когда сервер отдаёт клиенту данные без запроса.
Обычный сценарий:
Моя реализация такого механизма выглядит примерно так.
Отдельный контроллер, используемый только для SSE:
Это давно известная проблема (https://github.com/rails/rails/issues/10989) и именно так её и костыляют. Так же на всякий случай сделан принудительный кик клиента через 4 часа. Просто для дополнительной профилактики мёртвых клиентов. Если клиент жив, то он переподключится (параметр retry в конструкторе SSE есть таймаут переподключения в секундах).
Модель BuildJob, в которой происходят обновления данных о которых надо сообщать клиентам:
Для реализации keep_alive потребовалась очередь с таймаутом. Её реализация тривиальна и не приводится для краткости (полный пример можно найти тут https://bitbucket.org/antlabs_dev/fbbs2).
Оказалось что в Chrome соедиенения SSE не закрываются при обновлении страницы с помощью Turbolinks (Turbolinks.visit)или просто при переходах по ссылкам. Таким образом быстро достигается лимит в 6 подключений на 1 домен и всё напрочь перестаёт работать, могает только закрытие вкладки и повторное открытие. И даже закрытие соединения в page:before-change не помогает. Возможно не повезло только мне, но пришлось сохранять объект EventSource в window и принудительно закрывать соединение на нём если он существует перед открытием нового.
И в заключении о настройке nginx для работы с puma и SSE.
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $http_host;
proxy_redirect off;
Это необходимо для работы SSE.
Так же незабываем что необходим многопоточный севрер (например, puma), на каждого SSE клиента создаётся поток (соответственно нужно запускать сервер с макмиальным кол-вом потоков не менее ожидаемого максимального количества одновременных плиентов. На каждый поток, работающий с базой будет использовано отдельное подключение к базе, поэтому незабываем устанавливать pool в database.yml равным максимальному количеству потоков.
Полные исходники проекта с SSE https://bitbucket.org/antlabs_dev/fbbs2
В сети есть несколько тьюториалов по использовании этого в RoR приложении, но они весьма простые и не затрагивают некоторые важные подводные камни, с которыми я столкнулся при написании feature branches build server - приложения подобного continuous integration, но с особенностью не важной для данной записи. О неудобствах, подводных камнях и моём их решении ниже.
Первое неудобство - это неприспособленность RoR для сценария когда сервер отдаёт клиенту данные без запроса.
Обычный сценарий:
- запрос клиента (контроллер)
- обработка (модель)
- ответ клиенту (контроллер)
- запрос клиента (контроллер)
- обработка (контроллер/модель)
- ожидание данных (модель)
- отправка данных клиенту (контроллер)
- goto 2 unless error
Моя реализация такого механизма выглядит примерно так.
Отдельный контроллер, используемый только для SSE:
class LiveUpdatesController < ApplicationControllerЭкшн build_job выполняется в отдельном потоке для каждого клиента.
include ActionController::Live
def build_jobs
Rails.logger.info "Starting SSE for new client"
response.headers['Content-Type'] = 'text/event-stream'
sse = SSE.new(response.stream, retry: 60, event: "update_build_jobs")
build_jobs_queue = QueueWithTimeout.new
Timeout::timeout(14400) do # kick the client after 4 hours
BuildJob.on_change(build_jobs_queue) do
begin
build_job = build_jobs_queue.pop_with_timeout(30) # every 30 seconds send keep live packets to determine disconnects
sse.write(params_for_build_job(build_job))
rescue ThreadError # pop timeout - send keep alive
sse.write params_for_keep_alive
retry
rescue => err
raise err
end
end
end
rescue ClientDisconnected, Timeout::Error => err
Rails.logger.warn err.to_s
ensure
sse.close
BuildJob.on_change_cleanup(build_jobs_queue)
Rails.logger.info "Closed SSE stream"
end
#1 Невозможность определить когда клиент отключился до попытки отправить ему данные
Например, пользователь закрыл вкладку вместе с SSE соединением, но наш сервер об этом не узнает пока не попытается что-то отправить клиенту. Если отправлять нечего, то мёртвое соединение (а вместе с ним и поток) будет висеть до бесконечности. Эксепшен ClientDisconnected бросается только в таком случае. Из-за этого приходится каждые 30 секунд слать клиенту "keep_alive" просто для возможности получить ClientDisconnected и завершить поток когда клиент отключился.
Это давно известная проблема (https://github.com/rails/rails/issues/10989) и именно так её и костыляют. Так же на всякий случай сделан принудительный кик клиента через 4 часа. Просто для дополнительной профилактики мёртвых клиентов. Если клиент жив, то он переподключится (параметр retry в конструкторе SSE есть таймаут переподключения в секундах).
Модель BuildJob, в которой происходят обновления данных о которых надо сообщать клиентам:
class BuildJob < ActiveRecord::BaseОбычно используют Redis или иные PubSub решения (в PostgreSQL даже есть), но для меня это было бы оверкиллом поэтому решил сделать так. Контроллер создаёт очередь, передаёт её в модель и дальше блокирует поток пока в ней появятся данные. А данные там появятся когда в модели сработает after_save коллбэк. В нём все очереди, переданные контроллерами заполняются новыми данными после чего контроллеры их получают из очередей и отдают клиентам.
after_save :notify
@@notify_queues = []
@@notify_queues_mutex = Mutex.new
class << self
def on_change(queue)
@@notify_queues_mutex.synchronize do
@@notify_queues << queue
end
loop do
@@notify_queues.each do
yield
end
end
end
end
def self.on_change_cleanup(queue)
@@notify_queues_mutex.synchronize do
@@notify_queues.delete(queue)
end
end
private
def notify
@@notify_queues.each do |q|
q.push self
end
end
end
Для реализации keep_alive потребовалась очередь с таймаутом. Её реализация тривиальна и не приводится для краткости (полный пример можно найти тут https://bitbucket.org/antlabs_dev/fbbs2).
#2 Особенности работы в Google Chrome
На клиентской стороне происходит нижеследующее:
connect_sse = ->Тут используется gon (https://github.com/gazay/gon) для передачи URL в JS. Если у Вас URL для SSE постоянный, то gon не нужен.
console.log "Trying to start SSE..."
if(gon? && gon.build_jobs_live_updates_path?)
path = gon.build_jobs_live_updates_path
console.log "SSE path " + path
if window.event_source
console.log "Stopping SSE before restarting"
window.event_source.close()
source = new EventSource(path)
window.event_source = source
$(document).on 'page:before-change', ->
console.log "Stopping SSE on page change"
source.close()
source.onerror = ->
console.log "Error occured while listening SSE"
source.addEventListener "update_build_jobs", (event) ->
if event.data == 'keep_alive'
console.log 'keep_alive'
else
onevent event
else
console.log "Error starting SSE"
onevent = (event) ->
json = JSON.parse(event.data)
console.log json
refresh_page json
refresh_page = (json) ->
if window.event_source
console.log "Stopping SSE before reloading page contents"
window.event_source.close()
#disable page scrolling to top after loading page content
Turbolinks.enableTransitionCache(true)
# pass current page url to visit method
Turbolinks.visit(location.toString())
#enable page scroll reset in case user clicks other link
Turbolinks.enableTransitionCache(false)
ready = ->
setTimeout (-> connect_sse()), 1000
$(document).ready(ready)
$(document).on('page:load', ready) # with turbolinks it causes multiple connection sse when walking across pages with sse
Оказалось что в Chrome соедиенения SSE не закрываются при обновлении страницы с помощью Turbolinks (Turbolinks.visit)или просто при переходах по ссылкам. Таким образом быстро достигается лимит в 6 подключений на 1 домен и всё напрочь перестаёт работать, могает только закрытие вкладки и повторное открытие. И даже закрытие соединения в page:before-change не помогает. Возможно не повезло только мне, но пришлось сохранять объект EventSource в window и принудительно закрывать соединение на нём если он существует перед открытием нового.
И в заключении о настройке nginx для работы с puma и SSE.
upstream puma {Ключеые строки:
server unix:///srv/www/fbbs2/control/tmp/sockets/puma.sock;
}
server {
listen 80;
server_name example.com;
root /srv/www/fbbs2/control/public;
location ^~ /assets/ {
gzip_static on;
expires max;
add_header Cache-Control public;
}
try_files $uri/index.html $uri @puma;
location @puma {
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $http_host;
proxy_redirect off;
proxy_pass http://puma;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
#proxy_set_header X-Accel-Buffering no;
}
error_page 500 502 503 504 /500.html;
client_max_body_size 1000M;
keepalive_timeout 10;
}
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $http_host;
proxy_redirect off;
Это необходимо для работы SSE.
Так же незабываем что необходим многопоточный севрер (например, puma), на каждого SSE клиента создаётся поток (соответственно нужно запускать сервер с макмиальным кол-вом потоков не менее ожидаемого максимального количества одновременных плиентов. На каждый поток, работающий с базой будет использовано отдельное подключение к базе, поэтому незабываем устанавливать pool в database.yml равным максимальному количеству потоков.
Полные исходники проекта с SSE https://bitbucket.org/antlabs_dev/fbbs2
Комментариев нет:
Отправить комментарий