Thursday, February 11, 2010

gen_client example walk-through

In this post I will show and comment the code for the XMPP bot that solves the problem of temporary subscriptions. Please see previous post for explanation of the problem.

The code is using gen_client behaviour, extension of exmpp library. You can find source code for gen_client here.

First, we start client process, using one of variations of gen_client:start (line 19). The last two parameters are module and its parameters. Module must implement gen_client behavior, and that's how you create your own client logic.
This example is using dummy_client module, which we could consider as "next to minimal" implementation of gen_client - all it does is sending "available" presence upon start (run/2 callback), and "unavailable" presence upon termination (stop/1 callback). How useful is this? The answer: we can add functionality to the callback module at any time during run-time, using either addHandler/2 or variations of  send_sync_packet with attached triggers.

Let's look at the code for some examples:

Lines 20 through 34: we are dynamically adding a handler for monitoring JIDs going offline. Once the JID sends "unavailable" presence, the handler cancels all JID's subscriptions.
As long as tidy_bot is on duty, there will be no mess anymore!

Lines 37 through 52: we also want to clean the mess that may have been created while tidy_bot wasn't online. So we obtain a list of subscriptions, using synchronized request (line 118), and for every such subscription the presence probe to its owner is being sent (line 73). Subscriptions, whose owners had not responded to the probe, would be canceled. Probe request is a synchronized request with trigger function.
Note how are we getting response to both synchronized responses in the same process. 
Because sync calls is a distinctive feature of gen_client, let me explain with some more details:

The variation send_sync_packet/4 uses "trigger function" (specified by 3rd parameter), which takes each incoming stanza and returns true if  this stanza carries a relevant response. This triggers the end of processing for synchronized request, and the calling process receives {ok, IncomingStanza}. If, on the other hand, there was no "triggering" stanza during timeout period (specified by 4st parameter of send_sync_packet/4), the calling process will receive a timeout atom.
In our case, the trigger function expects to see "available" presence for the JID we've sent the probe to (line 57). If this happens within the timeout interval (4 seconds, as defined by this send_sync_packet call), the calling process receives {ok, PresenceMessage}, otherwise it receives a timeout.
Note that because of the way the trigger function in this particular case was constructed, the calling process doesn't even need to analyze the message itself.

The simpler variation, send_sync_packet/3, is not defining trigger function. In this case, the default "trigger function" will be used. It will just try to match identifiers of incoming and outgoing stanzas. So send_sync_packet/3 is most appropriate for sending IQ stanzas, where the request almost always matches the response by identifier.

There is also a case of asynchronous request (line 88). In this particular case, it's "fire and forget" approach, i.e. we don't want to analyze the response. If we did, we could do it by assigning a handler function, as you've already seen before.

It's important to note that synchronous requests only block the calling process, but not the handling of incoming stanzas. So, taking our example, the handler we have set up for monitoring "unavailable" presence will still be operational while the probe requests get sent. Moreover, every handler call spawns a separate process, so incoming stanzas don't wait for prior ones to be processed.

A little more about handlers and triggers. gen_client internally applies each of added handlers to incoming stanzas. Triggers implicitly add specialized handlers that "wrap" trigger function into appropriate call. After the handler was applied to a stanza, the gen_client decides either to keep the handler for subsequent processing, or to dispose of it. Trigger-based handlers always get disposed, by their purpose of serving a single synchronous request. As for other handlers, the rule is that the handler will be applied to all incoming stanzas until it returns stop atom, at which point it will be disposed of. There is also a possibility to remove handler (gen_client:remove_handler/2), if you cared to save a handler reference returned by add_handler/2.

I want to take the opportunity to thank Jean-Lou Dupont for his excellent erlang syntax highlighter, which I'm using below. 

%% Author: bokner
%% Created: Feb 3, 2010
%% Description: Monitors and clears temporary pubsub subscriptions.
-module(tidy_bot).

%%
%% Include files
%%
-include_lib("exmpp/include/exmpp_client.hrl").
-include("gen_client.hrl").
%%
%% Exported Functions
%%
-export([tidy_subscriptions/5]).
%%
%% API Functions
%%
tidy_subscriptions(Jid, Password, Host, Port, PubSub) ->
 {ok, Session, _C} = gen_client:start(Jid, Host, Port, Password, dummy_client, ["On tidy duty"]),
 JidOfflineHandler = 
  fun(#received_packet{packet_type = presence, type_attr = "unavailable", from = PeerJid}, #client_state{jid = BotJid} = _State) when BotJid /= PeerJid ->
     {Node, Domain, _Resource} = PeerJid, 
     case exmpp_jid:bare_compare(BotJid, exmpp_jid:make(Node, Domain)) of
      false ->
       io:format("~p gone offline~n", [PeerJid]),
       unsubscribe_from_all_nodes(Session, PeerJid, PubSub);              
      _Other ->
       void
     end,
     ok;
   (_Other, _Session) ->
    ok
  end,
 gen_client:add_handler(Session, JidOfflineHandler), 
 
 %% Get subscriptions 
 process_subscriptions(
  Session, PubSub, 
  fun(SubscriptionList) ->
     lists:foreach(fun(S) -> 
              spawn(
               fun() -> 
                  unsubscribe_temporary(Session, PubSub,
                             exmpp_xml:get_attribute(S, "jid", undefined),
                             exmpp_xml:get_attribute(S, "node", undefined),
                             exmpp_xml:get_attribute(S, "subid", undefined)
                            )
               end
                 )
            end, 
            SubscriptionList           
           ) end),
 ok.

unsubscribe_temporary(Session, PubSub, Jid, Node, _Subid) ->
 %% Prepare handler for presence
 ProbeSuccessfull = fun(#received_packefrom = FullJid, packet_type = presence, type_attr = "available"}, _State) ->
             {Acc, Domain, Resource} = FullJid,          
             case exmpp_jid:parse(Jid) of
              {jid, Jid, Acc, Domain, Resource} ->
               io:format("probe matches for ~p~n", [FullJid]),
               true;
              _NoMatch ->
               io:format("probe doesn't match for ~p, ~p~n", [Jid, FullJid]),
               false
             end;
            (_NonPresence, _State) ->
             false
           end, 
 %% Send presence probe
 io:format("Sending probe to ~p:~n", [Jid]),
 
 ProbeResult = gen_client:send_sync_packet(Session, exmpp_stanza:set_recipient(
                       exmpp_presence:probe(), Jid), ProbeSuccessfull, 4000),
 io:format("result of probe for ~p:~n~p~n", [Jid, ProbeResult]),
 case ProbeResult of 
  timeout ->
   unsubscribe_from_node(Session, Jid, Node, PubSub),
   timeout;
  {ok, #received_packet{type_attr = Type}} ->
   io:format("Probe:~p:~p~n", [Jid, Type]);
  Other ->
   io:format("Unexpected:~p~n", [Other])
 end.

unsubscribe_from_node(Session, Jid, Node, PubSub) ->
 io:format("Unsubscribing ~p from ~p...", [Jid, Node]),
 gen_client:send_packet(Session, exmpp_client_pubsub:unsubscribe(Jid, PubSub, Node)),
 io:format("Done.~n").

unsubscribe_from_all_nodes(Session, {Acc, Domain, Resource} = Jid, PubSub) ->
 io:format("Unsubscribing ~p~n", [Jid]),
 process_subscriptions(
  Session, PubSub, 
  fun(SList) ->
     lists:foreach(fun(S) -> 
              spawn(
               fun() -> 
                  JidAttr = exmpp_xml:get_attribute(S, "jid", undefined),
                  case JidAttr == exmpp_jid:to_binary(Acc, Domain, Resource) of
                   true ->
                    unsubscribe_from_node(Session, JidAttr, exmpp_xml:get_attribute(S, "node", undefined), PubSub);
                   false ->
                    void
                  end
               
               end
                 )
            
            end, 
            SList
           
           ) end        
            ),
 ok.

process_subscriptions(Session, PubSub, Fun) ->
 {ok, SubscriptionPacket} = gen_client:send_sync_packet(Session, exmpp_client_pubsub:get_subscriptions(PubSub), 5000),
 %%io:format("Subscriptions:~p~n", [SubscriptionPacket]),
 Payload = exmpp_iq:get_payload(exmpp_iq:xmlel_to_iq(SubscriptionPacket#received_packet.raw_packet)),
 
 Fun(
  exmpp_xml:get_elements(
   exmpp_xml:get_element(Payload, "subscriptions"),
   "subscription")
   ).

Wednesday, February 10, 2010

gen_client in action: fixing temporary subscriptions

In this post I'd like to discuss one particular pubsub challenge, namely temporary subscriptions. I will also use this discussion as an opportunity to show some gen_client code that was written in order to deal with this challenge.

Let's assume we have a web-based XMPP weather service available for free public access, so everyone can come to our site, choose places and watch weather data updated in real-time. This is what pubsub is designed for: data streams being publishers, the clients being subscribers; each data stream will publish to its respective node, and the clients will subscribe to nodes they are interested in. Now comes interesting part: we expect significant number of clients to come use our service, so  we don't want to create accounts for each client. Instead clients will be automatically signed in with some shared account and assigned random resource name. XEP-0060 allows subscriptions based on full JIDs, so each client will still have its own subscriptions. However, using random resource names  impose having temporary subscriptions, because the moment client signs out, the resource name he was using becomes unusable, and so do subscriptions.

So how do we deal wth temporary subscriptions? XEP-0060 (1.13rc13, p. 12.4) describes how it should be: once subscriber goes offline, the temporary subscription gets canceled. Unfortunately, it looks like ejabberd (v. 2.1.2 at the time of writing)  doesn't yet have it working, at least I was unable to configure temporary subscriptions the way XEP-0060 suggests. This means that once resource is gone, its subscriptions are still hanging around. Bad (very bad) thing about  it, not to mention excessive memory consumption, is that ejabberd will push data meant for these orphaned subscriptions to the resources of the same account that are still online. This will be an absolute mess - even if your client code is smart enough to filter foreign subscriptions (possibly by matching subscription identifiers), the enormous traffic will be generated pretty soon. Remember, we have a single shared account for all our clients.  Conclusion: we absolutely have to find the way to get rid of orphaned subscriptions.

Here is a source code of the bot (using gen_client) that supports temporary subscriptions by monitoring resources and getting rid of subscriptions at the moment resource they belong to goes offline. Additionally, it cleans up such subscriptions on a startup.  I'm planning to do code walk-through and explain gen_client capabilities shown in the code in one of posts following shortly.

This is, of course, a temporary solution and should only be used if your XMPP server doesn't support temporary subscriptions.

I still have a feeling that there might be an easier solution, but I haven't found any practical cases of using temporary subscriptions, so if anyone has related experience, please come forward and share it here.