从Clojure向ClojureScript的服务器推送数据

10

我正在使用Clojure编写一个应用程序服务器,客户端将使用ClojureScript。

我想找到一种高效、符合惯例的方法,以实时事件形式从服务器推送数据到客户端,最好是使用以下某种组合:

  • http-kit
  • core.async
  • Ring

(但我也可以接受其他可能性)

有人能提供一个好的示例/方法来完成这个任务吗?


请参考 https://github.com/sunng87/ring-jetty9-adapter 和 http-kit,它们也支持ws。 - edbond
Websockets支持 = http://caniuse.com/websockets http-kit文档: http://http-kit.org/server.html#channel 当你收到消息时,将其放入通道中,然后完成。 - edbond
3个回答

6
我喜欢使用aleph,这里是wiki,您可以使用wrap-ring-handler函数简单地包装现有的处理程序。
对于“push”函数,最有用的部分是aleph的异步处理程序。它是在netty之上构建的,不是一次连接一个线程模型,因此服务器端不需要担心tcp连接数。
一些实现细节:
  • 服务器端使用异步处理程序,持有所有客户端连接(通道)
  • 在60(例如)秒内,如果没有“新数据”,则发送空响应
  • 如果服务器端有响应,则发送响应。
  • 客户端可以简单地向服务器发送普通的http请求
  • 当客户端获得响应后,处理响应体,然后再次发送http请求
  • 请检查客户端和所有代理服务器以设置正确的超时值
这里还有更多方法:http://en.wikipedia.org/wiki/Push_technology

5

最近我一直在尝试使用库 Chord,我非常喜欢它。

它提供了一个小的 core.async 包装器,围绕着 http-kit 中的 Websocket 支持。

从 Github 页面上看:

在服务器上

(:require [chord.http-kit :refer [with-channel]]
          [clojure.core.async :refer [<! >! put! close! go]])

(defn your-handler [req]
  (with-channel req ws-ch
    (go
      (let [{:keys [message]} (<! ws-ch)]
        (println "Message received:" message)
        (>! ws-ch "Hello client from server!")
        (close! ws-ch)))))

在客户端

(:require [chord.client :refer [ws-ch]]
          [cljs.core.async :refer [<! >! put! close!]])
(:require-macros [cljs.core.async.macros :refer [go]])

(go
  (let [ws (<! (ws-ch "ws://localhost:3000/ws"))]
    (>! ws "Hello server from client!")))

我认为它仍处于早期阶段 - 它还不能处理断开连接的情况。


4
我正在开发一个项目,其中有与您完全相同的要求。我使用了基座服务与core.async相结合来实现服务器发送事件(SSE),并且运行得非常好。
不幸的是,我现在不能公开这个工作,但基本上,我做了类似下面片段的事情,只是由于身份验证而更加复杂。(从浏览器进行SSE身份验证并不特别容易,因为您无法在new EventSource(SOME_URI);中传递任何自定义标头。)
因此,以下是示例片段:
(ns chat-service.service
  (:require [clojure.set :as set]
            [clojure.core.async :as async :refer [<!! >!! <! >!]]
            [cheshire.core :as json]
            [io.pedestal.service.http :as bootstrap]
            [io.pedestal.service.log :as log]
            [io.pedestal.service.http.route :as route]
            [io.pedestal.service.http.sse :as sse]
            [io.pedestal.service.http.route.definition :refer [defroutes]]))

(def ^{:private true :doc "Formatting opts"} json-opts {:date-format "MMM dd, yyyy HH:mm:ss Z"})

(def ^{:private true :doc "Users to notification channels"} subscribers->notifications (atom {}))

 ;; private helper functions
(def ^:private generate-id #(.toString (java.util.UUID/randomUUID)))

(defn- sse-msg [event msg-data]
  {:event event :msg msg-data})

;; service functions    
(defn- remove-subscriber
  "Removes transport channel from atom subscribers->notifications and tears down
   SSE connection."
  [transport-channel context]
  (let [subscriber (get (set/map-invert @subscribers->notifications) transport-channel)]
    (log/info :msg (str "Removing SSE connection for subscriber with ID : " subscriber))
    (swap! subscribers->notifications dissoc subscriber)
    (sse/end-event-stream context)))

(defn send-event
  "Sends updates via SSE connection, takes also transport channel to close it
   in case of the exception."
  [transport-channel context {:keys [event msg]}]
  (try
    (log/info :msg "calling event sending fn")
    (sse/send-event context event (json/generate-string msg json-opts))
    (catch java.io.IOException ioe
      (async/close! transport-channel))))

(defn create-transport-channel
  "Creates transport channel with receiving end pushing updates to SSE connection.
   Associates this transport channel in atom subscribers->notifications under random
   generated UUID."
  [context]
  (let [temporary-id (generate-id)
        channel (async/chan)]
    (swap! subscribers->notifications assoc temporary-id channel)
    (async/go-loop []
      (when-let [payload (<! channel)]
        (send-event channel context payload)
        (recur))
      (remove-subscriber channel context))
    (async/put! channel (sse-msg "eventsourceVerification"
                                 {:handshakeToken temporary-id}))))

(defn subscribe
  "Subscribes anonymous user to SSE connection. Transport channel with timeout set up
   will be created for pushing any new data to this connection."
  [context]
  (create-transport-channel context))

(defroutes routes
  [[["/notifications/chat"
     {:get [::subscribe (sse/start-event-stream subscribe)]}]]])

(def service {:env :prod
              ::bootstrap/routes routes
              ::bootstrap/resource-path "/public"
              ::bootstrap/type :jetty
              ::bootstrap/port 8081})

我遇到的一个问题是Pedestal默认处理断开的SSE连接的方式。

因为有定期的心跳作业,所以每当连接断开时,并且您没有调用 end-event-stream 上下文,它就会记录异常。

我希望有一种方法可以禁用/调整此行为,或者至少提供自己的拆除函数,每当心跳作业由于 EofException 失败时将调用该函数。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接