Thursday, February 11, 2010

gen_client example walk-through

In this post I will show and comment the code for the XMPP bot that solves the problem of temporary subscriptions. Please see previous post for explanation of the problem.

The code is using gen_client behaviour, extension of exmpp library. You can find source code for gen_client here.

First, we start client process, using one of variations of gen_client:start (line 19). The last two parameters are module and its parameters. Module must implement gen_client behavior, and that's how you create your own client logic.
This example is using dummy_client module, which we could consider as "next to minimal" implementation of gen_client - all it does is sending "available" presence upon start (run/2 callback), and "unavailable" presence upon termination (stop/1 callback). How useful is this? The answer: we can add functionality to the callback module at any time during run-time, using either addHandler/2 or variations of  send_sync_packet with attached triggers.

Let's look at the code for some examples:

Lines 20 through 34: we are dynamically adding a handler for monitoring JIDs going offline. Once the JID sends "unavailable" presence, the handler cancels all JID's subscriptions.
As long as tidy_bot is on duty, there will be no mess anymore!

Lines 37 through 52: we also want to clean the mess that may have been created while tidy_bot wasn't online. So we obtain a list of subscriptions, using synchronized request (line 118), and for every such subscription the presence probe to its owner is being sent (line 73). Subscriptions, whose owners had not responded to the probe, would be canceled. Probe request is a synchronized request with trigger function.
Note how are we getting response to both synchronized responses in the same process. 
Because sync calls is a distinctive feature of gen_client, let me explain with some more details:

The variation send_sync_packet/4 uses "trigger function" (specified by 3rd parameter), which takes each incoming stanza and returns true if  this stanza carries a relevant response. This triggers the end of processing for synchronized request, and the calling process receives {ok, IncomingStanza}. If, on the other hand, there was no "triggering" stanza during timeout period (specified by 4st parameter of send_sync_packet/4), the calling process will receive a timeout atom.
In our case, the trigger function expects to see "available" presence for the JID we've sent the probe to (line 57). If this happens within the timeout interval (4 seconds, as defined by this send_sync_packet call), the calling process receives {ok, PresenceMessage}, otherwise it receives a timeout.
Note that because of the way the trigger function in this particular case was constructed, the calling process doesn't even need to analyze the message itself.

The simpler variation, send_sync_packet/3, is not defining trigger function. In this case, the default "trigger function" will be used. It will just try to match identifiers of incoming and outgoing stanzas. So send_sync_packet/3 is most appropriate for sending IQ stanzas, where the request almost always matches the response by identifier.

There is also a case of asynchronous request (line 88). In this particular case, it's "fire and forget" approach, i.e. we don't want to analyze the response. If we did, we could do it by assigning a handler function, as you've already seen before.

It's important to note that synchronous requests only block the calling process, but not the handling of incoming stanzas. So, taking our example, the handler we have set up for monitoring "unavailable" presence will still be operational while the probe requests get sent. Moreover, every handler call spawns a separate process, so incoming stanzas don't wait for prior ones to be processed.

A little more about handlers and triggers. gen_client internally applies each of added handlers to incoming stanzas. Triggers implicitly add specialized handlers that "wrap" trigger function into appropriate call. After the handler was applied to a stanza, the gen_client decides either to keep the handler for subsequent processing, or to dispose of it. Trigger-based handlers always get disposed, by their purpose of serving a single synchronous request. As for other handlers, the rule is that the handler will be applied to all incoming stanzas until it returns stop atom, at which point it will be disposed of. There is also a possibility to remove handler (gen_client:remove_handler/2), if you cared to save a handler reference returned by add_handler/2.

I want to take the opportunity to thank Jean-Lou Dupont for his excellent erlang syntax highlighter, which I'm using below. 

%% Author: bokner
%% Created: Feb 3, 2010
%% Description: Monitors and clears temporary pubsub subscriptions.
-module(tidy_bot).

%%
%% Include files
%%
-include_lib("exmpp/include/exmpp_client.hrl").
-include("gen_client.hrl").
%%
%% Exported Functions
%%
-export([tidy_subscriptions/5]).
%%
%% API Functions
%%
tidy_subscriptions(Jid, Password, Host, Port, PubSub) ->
 {ok, Session, _C} = gen_client:start(Jid, Host, Port, Password, dummy_client, ["On tidy duty"]),
 JidOfflineHandler = 
  fun(#received_packet{packet_type = presence, type_attr = "unavailable", from = PeerJid}, #client_state{jid = BotJid} = _State) when BotJid /= PeerJid ->
     {Node, Domain, _Resource} = PeerJid, 
     case exmpp_jid:bare_compare(BotJid, exmpp_jid:make(Node, Domain)) of
      false ->
       io:format("~p gone offline~n", [PeerJid]),
       unsubscribe_from_all_nodes(Session, PeerJid, PubSub);              
      _Other ->
       void
     end,
     ok;
   (_Other, _Session) ->
    ok
  end,
 gen_client:add_handler(Session, JidOfflineHandler), 
 
 %% Get subscriptions 
 process_subscriptions(
  Session, PubSub, 
  fun(SubscriptionList) ->
     lists:foreach(fun(S) -> 
              spawn(
               fun() -> 
                  unsubscribe_temporary(Session, PubSub,
                             exmpp_xml:get_attribute(S, "jid", undefined),
                             exmpp_xml:get_attribute(S, "node", undefined),
                             exmpp_xml:get_attribute(S, "subid", undefined)
                            )
               end
                 )
            end, 
            SubscriptionList           
           ) end),
 ok.

unsubscribe_temporary(Session, PubSub, Jid, Node, _Subid) ->
 %% Prepare handler for presence
 ProbeSuccessfull = fun(#received_packefrom = FullJid, packet_type = presence, type_attr = "available"}, _State) ->
             {Acc, Domain, Resource} = FullJid,          
             case exmpp_jid:parse(Jid) of
              {jid, Jid, Acc, Domain, Resource} ->
               io:format("probe matches for ~p~n", [FullJid]),
               true;
              _NoMatch ->
               io:format("probe doesn't match for ~p, ~p~n", [Jid, FullJid]),
               false
             end;
            (_NonPresence, _State) ->
             false
           end, 
 %% Send presence probe
 io:format("Sending probe to ~p:~n", [Jid]),
 
 ProbeResult = gen_client:send_sync_packet(Session, exmpp_stanza:set_recipient(
                       exmpp_presence:probe(), Jid), ProbeSuccessfull, 4000),
 io:format("result of probe for ~p:~n~p~n", [Jid, ProbeResult]),
 case ProbeResult of 
  timeout ->
   unsubscribe_from_node(Session, Jid, Node, PubSub),
   timeout;
  {ok, #received_packet{type_attr = Type}} ->
   io:format("Probe:~p:~p~n", [Jid, Type]);
  Other ->
   io:format("Unexpected:~p~n", [Other])
 end.

unsubscribe_from_node(Session, Jid, Node, PubSub) ->
 io:format("Unsubscribing ~p from ~p...", [Jid, Node]),
 gen_client:send_packet(Session, exmpp_client_pubsub:unsubscribe(Jid, PubSub, Node)),
 io:format("Done.~n").

unsubscribe_from_all_nodes(Session, {Acc, Domain, Resource} = Jid, PubSub) ->
 io:format("Unsubscribing ~p~n", [Jid]),
 process_subscriptions(
  Session, PubSub, 
  fun(SList) ->
     lists:foreach(fun(S) -> 
              spawn(
               fun() -> 
                  JidAttr = exmpp_xml:get_attribute(S, "jid", undefined),
                  case JidAttr == exmpp_jid:to_binary(Acc, Domain, Resource) of
                   true ->
                    unsubscribe_from_node(Session, JidAttr, exmpp_xml:get_attribute(S, "node", undefined), PubSub);
                   false ->
                    void
                  end
               
               end
                 )
            
            end, 
            SList
           
           ) end        
            ),
 ok.

process_subscriptions(Session, PubSub, Fun) ->
 {ok, SubscriptionPacket} = gen_client:send_sync_packet(Session, exmpp_client_pubsub:get_subscriptions(PubSub), 5000),
 %%io:format("Subscriptions:~p~n", [SubscriptionPacket]),
 Payload = exmpp_iq:get_payload(exmpp_iq:xmlel_to_iq(SubscriptionPacket#received_packet.raw_packet)),
 
 Fun(
  exmpp_xml:get_elements(
   exmpp_xml:get_element(Payload, "subscriptions"),
   "subscription")
   ).