diff --git a/zmq.ipynb b/zmq.ipynb new file mode 100644 index 0000000..4f8cc0a --- /dev/null +++ b/zmq.ipynb @@ -0,0 +1,154 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "import MsgPack\n", + "using ZMQ\n", + "import ZMQ: send" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "data": { + "text/plain": [ + "send (generic function with 10 methods)" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "type LazyPirate\n", + " ctx::Context\n", + " socket::Socket\n", + " server_endpoint::String\n", + "end\n", + "\n", + "function send(pirate::LazyPirate, msg)\n", + " while true\n", + " send(pirate.socket, Message(msg))\n", + " \n", + " c = Channel()\n", + " @async try\n", + " put!(c, (ZMQ.recv(pirate.socket), :received))\n", + " catch e\n", + " put!(c, (e, :errored))\n", + " end\n", + " @async begin\n", + " sleep(1)\n", + " put!(c, (nothing, :timedout))\n", + " end\n", + " data, status = take!(c)\n", + " \n", + " if status == :errored\n", + " rethrow(data)\n", + " elseif status == :timedout\n", + " ZMQ.set_linger(pirate.socket, 0)\n", + " ZMQ.close(pirate.socket)\n", + " pirate.socket = Socket(pirate.ctx, REQ)\n", + " ZMQ.connect(pirate.socket, pirate.server_endpoint) \n", + " else # status == :received\n", + " break\n", + " end\n", + " end\n", + "end\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "ctx = Context()\n", + "s = Socket(ctx, REQ)\n", + "ZMQ.connect(s, \"tcp://localhost:5555\")" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "ZMQ.send(s, Message(\"Hello\"))" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "data": { + "text/plain": [ + "LazyPirate(ZMQ.Context(Ptr{Void} @0x00000000024d7eb0,ZMQ.Socket[ZMQ.Socket(Ptr{Void} @0x0000000003e2e780,Base.Filesystem._FDWatcher(Ptr{Void} @0x0000000002b03f10,(1,0),Condition(Any[]),(false,false)))]),ZMQ.Socket(Ptr{Void} @0x0000000003e2e780,Base.Filesystem._FDWatcher(Ptr{Void} @0x0000000002b03f10,(1,0),Condition(Any[]),(false,false))),\"tcp://localhost:5555\")" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "pirate = LazyPirate(ctx, s, \"tcp://localhost:5555\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "send(pirate, MsgPack.pack(\"hello world\"))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Julia 0.5.0", + "language": "julia", + "name": "julia-0.5" + }, + "language_info": { + "file_extension": ".jl", + "mimetype": "application/julia", + "name": "julia", + "version": "0.5.0" + } + }, + "nbformat": 4, + "nbformat_minor": 1 +} diff --git a/zmq2.ipynb b/zmq2.ipynb new file mode 100644 index 0000000..9b2b265 --- /dev/null +++ b/zmq2.ipynb @@ -0,0 +1,71 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "import MsgPack\n", + "using ZMQ" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "ctx = Context()\n", + "s = Socket(ctx, REP)\n", + "ZMQ.bind(s, \"tcp://*:5555\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "m = \"hello\"\n", + "m = \"hello\"\n", + "m = \"hello\"\n", + "m = \"hello\"\n", + "m = \"hello world\"\n" + ] + } + ], + "source": [ + "while true\n", + " m = MsgPack.unpack(read(seek(convert(IOStream, ZMQ.recv(s)), 0)))\n", + " @show m\n", + " ZMQ.send(s, Message(MsgPack.pack(\"ok\")))\n", + "end" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Julia 0.5.0", + "language": "julia", + "name": "julia-0.5" + }, + "language_info": { + "file_extension": ".jl", + "mimetype": "application/julia", + "name": "julia", + "version": "0.5.0" + } + }, + "nbformat": 4, + "nbformat_minor": 1 +}