首页>代码>【小牛分享】- pushlet指定用户推送消息>/pushletTest/src/nl/justobjects/pushlet/client/PushletClient.java
// Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl>
// Distributable under LGPL license. See terms of license at gnu.org.

package nl.justobjects.pushlet.client;

import nl.justobjects.pushlet.core.Event;
import nl.justobjects.pushlet.core.EventParser;
import nl.justobjects.pushlet.core.Protocol;
import nl.justobjects.pushlet.util.PushletException;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.OutputStream;
import java.net.*;
import java.util.Map;

/**
 * Client API for Java HTTP client applets or apps.
 * <p/>
 * Use this class within Java client applications or applets.
 * Implement a PushletClientListener to receive callbacks for
 * data-related Event objects pushed by the server.
 * <p/>
 * This class may also be used as a base class and be extended
 * for custom clients, hence the presence of many proteced methods.
 *
 * @author Just van den Broecke - Just Objects &copy;
 * @version $Id: PushletClient.java,v 1.19 2009/06/04 12:46:35 justb Exp $
 * @see PushletClientListener
 * @see nl.justobjects.pushlet.test.PushletApplet
 * @see nl.justobjects.pushlet.test.PushletPingApplication
 */
public class PushletClient implements Protocol {
	/**
	 * Pushlet URL.
	 */
	private String pushletURL;

	/**
	 * Debug flag for verbose output.
	 */
	private boolean debug;

	/**
	 * Id gotten on join ack
	 */
	private String id;

	/**
	 * Internal listener for data events pushed by server.
	 */
	protected DataEventListener dataEventListener;

	/**
	 * Constructor with full pushlet URL.
	 */
	public PushletClient(String aPushletURL) {
		pushletURL = aPushletURL;
	}

	/**
	 * Constructor with host and port using default URI.
	 */
	public PushletClient(String aHost, int aPort) {
		this("http://" + aHost + ":" + aPort + DEFAULT_SERVLET_URI);
	}

	/**
	 * Set proxy options and optional proxy authentication.
	 * <p/>
	 * Contributed by Dele Olajide
	 * See http://groups.yahoo.com/group/pushlet/message/634
	 * <p/>
	 * Usage:
	 * PushletClient pushletClient = new PushletClient("http:://www.domain.com/pushlet");
	 * pushletClient.setProxyOptions("proxy.bla.com", "8080", ....);
	 * <p/>
	 * use pushletClient further as normal
	 */
	public void setProxyOptions(String aProxyHost,
	                            String aProxyPort, String theNonProxyHosts,
	                            String aUserName, String aPassword, String anNTLMDomain) {

		// Enable proxying
		System.setProperty("http.proxySet", "true");
		System.setProperty("http.proxyHost", aProxyHost);
		System.setProperty("http.proxyPort", aProxyPort);

		// Set optional non-proxy hosts
		if (theNonProxyHosts != null) {
			System.setProperty("http.nonProxyHosts", theNonProxyHosts);
		}

		// If user name specified configure proxy authentication
		if (aUserName != null) {
			System.setProperty("http.proxyUser", aUserName);
			System.setProperty("http.proxyPassword", aPassword);

			// See inner class below
			Authenticator.setDefault(new HTTPAuthenticateProxy(aUserName, aPassword));

			// Optional NT domain
			if (anNTLMDomain != null) {
				System.setProperty("http.auth.ntlm.domain", anNTLMDomain);
			}
		}
	}

	/**
	 * Join server, starts session.
	 */
	public void join() throws PushletException {
		Event event = new Event(E_JOIN);
		event.setField(P_FORMAT, FORMAT_XML);
		Event response = doControl(event);
		throwOnNack(response);

		// Join Ack received
		id = response.getField(P_ID);
	}

	/**
	 * Leave server, stops session.
	 */
	public void leave() throws PushletException {
		stopListen();
		throwOnInvalidSession();
		Event event = new Event(E_LEAVE);
		event.setField(P_ID, id);
		Event response = doControl(event);

		throwOnNack(response);
		id = null;
	}

	/**
	 * Open data channel.
	 */
	public void listen(PushletClientListener aListener) throws PushletException {
		listen(aListener, MODE_STREAM);
	}

	/**
	 * Open data channel in stream or push mode.
	 */
	public void listen(PushletClientListener aListener, String aMode) throws PushletException {
		listen(aListener, aMode, null);
	}

	/**
	 * Open data channel in stream or push mode with a subject.
	 */
	public void listen(PushletClientListener aListener, String aMode, String aSubject) throws PushletException {
		throwOnInvalidSession();
		stopListen();

		String listenURL = pushletURL
				+ "?" + P_EVENT + "=" + E_LISTEN
				+ "&" + P_ID + "=" + id
				+ "&" + P_MODE + "=" + aMode;
		if (aSubject != null) {
			listenURL = listenURL + "&" + P_SUBJECT + "=" + aSubject;
		}

		// Start listener thread (sync call).
		startDataEventListener(aListener, listenURL);
	}

	/**
	 * Immediate listener: joins/subscribes and listens in one action.
	 */
	public void joinListen(PushletClientListener aListener, String aMode, String aSubject) throws PushletException {
		stopListen();

		String listenURL = pushletURL
				+ "?" + P_EVENT + "=" + E_JOIN_LISTEN
				+ "&" + P_FORMAT + "=" + FORMAT_XML
				+ "&" + P_MODE + "=" + aMode
				+ "&" + P_SUBJECT + "=" + aSubject;

		// Start listener thread (sync call).
		startDataEventListener(aListener, listenURL);
	}

	/**
	 * Publish an event through server.
	 */
	public void publish(String aSubject, Map theAttributes) throws PushletException {
		throwOnInvalidSession();
		Event event = new Event(E_PUBLISH, theAttributes);
		event.setField(P_SUBJECT, aSubject);
		event.setField(P_ID, id);
		Event response = doControl(event);
		throwOnNack(response);
	}

	/**
	 * Subscribes, returning subscription id.
	 */
	public String subscribe(String aSubject, String aLabel) throws PushletException {
		throwOnInvalidSession();
		Event event = new Event(E_SUBSCRIBE);
		event.setField(P_ID, id);
		event.setField(P_SUBJECT, aSubject);

		// Optional label, is returned in data events
		if (aLabel != null) {
			event.setField(P_SUBSCRIPTION_LABEL, aLabel);
		}

		// Send request
		Event response = doControl(event);
		throwOnNack(response);

		return response.getField(P_SUBSCRIPTION_ID);
	}

	/**
	 * Subscribes, returning subscription id.
	 */
	public String subscribe(String aSubject) throws PushletException {
		return subscribe(aSubject, null);
	}

	/**
	 * Unsubscribes with subscription id.
	 */
	public void unsubscribe(String aSubscriptionId) throws PushletException {
		throwOnInvalidSession();
		Event event = new Event(E_UNSUBSCRIBE);
		event.setField(P_ID, id);

		// Optional subscription id
		if (aSubscriptionId != null) {
			event.setField(P_SUBSCRIPTION_ID, aSubscriptionId);
		}

		Event response = doControl(event);
		throwOnNack(response);
	}

	/**
	 * Unsubscribes from all subjects.
	 */
	public void unsubscribe() throws PushletException {
		unsubscribe(null);
	}

	/**
	 * Stop the listener.
	 */
	public void stopListen() throws PushletException {
		if (dataEventListener != null) {
			unsubscribe();
			dataEventListener.stop();
			dataEventListener = null;
		}
	}

	public void setDebug(boolean b) {
		debug = b;
	}

	/**
	 * Starts default DataEventListener and waits for its thread to start.
	 */
	protected void startDataEventListener(PushletClientListener aListener, String aListenURL) {
		// Suggestion by Jeff Nowakowski 29.oct.2006
		dataEventListener = new DataEventListener(aListener, aListenURL);

		synchronized (dataEventListener) {
			dataEventListener.start();
			try {
				// Wait for data event listener (thread) to start
				dataEventListener.wait();
			} catch (InterruptedException e) {
			}
		}
	}

	protected void throwOnNack(Event anEvent) throws PushletException {
		if (anEvent.getEventType().equals(E_NACK)) {
			throw new PushletException("Negative response: reason=" + anEvent.getField(P_REASON));
		}
	}

	protected void throwOnInvalidSession() throws PushletException {
		if (id == null) {
			throw new PushletException("Invalid pushlet session");
		}
	}

	protected Reader openURL(String aURL) throws PushletException {
		// Open URL connection with server
		try {
			p("Connecting to " + aURL);
			URL url = new URL(aURL);
			URLConnection urlConnection = url.openConnection();

			// Disable any kind of caching.
			urlConnection.setUseCaches(false);
			urlConnection.setDefaultUseCaches(false);

			// TODO: later version may use POST
			// Enable HTTP POST
			// urlConnection.setDoOutput(true);

			// Do the POST with Event in XML in body
			// OutputStream os = urlConnection.getOutputStream();
			// os.write(anEvent.toXML().getBytes());
			// os.flush();
			// os.close();

			// Get the stream from the server.
			// reader = new BufferedReader(new InputStreamReader(urlConnection.getInputStream()));
			// Note: somehow the client does not work with some JVMs when using
			// BufferedInputStream... So do unbuffered input.
			// p("Opening urlConnection inputstream");
			return new InputStreamReader(urlConnection.getInputStream());

		} catch (Throwable t) {
			warn("openURL() could not open " + aURL, t);
			throw new PushletException(" could not open " + aURL, t);
		}
	}


	/**
	 * Send control events to server and return response.
	 */
	protected Event doControl(Event aControlEvent) throws PushletException {
		String controlURL = pushletURL + "?" + aControlEvent.toQueryString();

		p("doControl to " + controlURL);

		// Open URL connection with server
		Reader reader = openURL(controlURL);

		// Get Pushlet event from stream
		Event event = null;
		try {
			p("Getting event...");
			// Get next event from server
			event = EventParser.parse(reader);
			p("Event received " + event);
			return event;
		} catch (Throwable t) {
			// Stop and report error.
			warn("doControl() exception", t);
			throw new PushletException(" error parsing response from" + controlURL, t);
		}
	}

	/**
	 * Util: print.
	 */
	protected void p(String s) {
		if (debug) {
			System.out.println("[PushletClient] " + s);
		}
	}

	/**
	 * Util: warn.
	 */
	protected void warn(String s) {
		warn(s, null);
	}

	/**
	 * Util: warn with exception.
	 */
	protected void warn(String s, Throwable t) {
		System.err.println("[PushletClient] - WARN - " + s + " ex=" + t);

		if (t != null) {
			t.printStackTrace();
		}
	}

	/**
	 * Internal (default) listener for the Pushlet data channel.
	 */
	protected class DataEventListener implements Runnable {
		/**
		 * Client's listener that gets called back on events.
		 */
		private PushletClientListener listener;

		/**
		 * Receiver receiveThread.
		 */
		private Thread receiveThread = null;
		private Reader reader;
		private String refreshURL;
		private String listenURL;

		public DataEventListener(PushletClientListener aListener, String aListenURL) {
			listener = aListener;
			listenURL = aListenURL;
		}

		public void start() {
			// All ok: start a receiver receiveThread
			receiveThread = new Thread(this);
			receiveThread.start();

		}

		/**
		 * Stop listening; may restart later with start().
		 */
		public void stop() {
			p("In stop()");
			bailout();
		}

		/**
		 * Receive event objects from server and callback listener.
		 */
		public void run() {
			p("Start run()");
			try {
				while (receiveThread != null && receiveThread.isAlive()) {
					// Connect to server
					reader = openURL(listenURL);

					synchronized (this) {
						// Inform the calling thread we're ready to receive events.
						// Suggestion by Jeff Nowakowski 29.oct.2006
						this.notify();
					}

					// Get events while we're alive.
					while (receiveThread != null && receiveThread.isAlive()) {
						Event event = null;
						try {
							// p("Getting event...");
							// Get next event from server
							event = EventParser.parse(reader);
							p("Event received " + event);
						} catch (Throwable t) {

							// Stop and report error.
							// warn("Stop run() on exception", t);
							if (listener != null) {
								listener.onError("exception during receive: " + t);
							}

							break;
						}

						// Handle event by calling listener
						if (event != null && listener != null) {
							// p("received: " + event.toXML());
							String eventType = event.getEventType();
							if (eventType.equals(E_HEARTBEAT)) {
								listener.onHeartbeat(event);
							} else if (eventType.equals(E_DATA)) {
								listener.onData(event);
							} else if (eventType.equals(E_JOIN_LISTEN_ACK)) {
								id = event.getField(P_ID);
							} else if (eventType.equals(E_LISTEN_ACK)) {
								p("Listen ack ok");
							} else if (eventType.equals(E_REFRESH_ACK)) {
								// ignore
							} else if (eventType.equals(E_ABORT)) {
								listener.onAbort(event);
								listener = null;
								break;
							} else if (eventType.equals(E_REFRESH)) {
								refresh(event);
							} else {
								handleUnknownEventType(eventType, event, listener);
							}
						}
					}
				}
			} catch (Throwable t) {
				warn("Exception in run() ", t);
				// bailout();
			}
		}

		protected void disconnect() {
			p("start disconnect()");
			if (reader != null) {
				try {
					// this blocks, find another way
					// reader.close();
					p("Closed reader ok");
				} catch (Exception ignore) {
				} finally {
					reader = null;
				}
			}
			p("end disconnect()");
		}

		/**
		 * Stop receiver receiveThread.
		 */
		public void stopThread() {
			p("In stopThread()");

			// Keep a reference such that we can kill it from here.
			Thread targetThread = receiveThread;

			receiveThread = null;

			// This should stop the main loop for this receiveThread.
			// Killing a receiveThread on a blcing read is tricky.
			// See also http://gee.cs.oswego.edu/dl/cpj/cancel.html
			if ((targetThread != null) && targetThread.isAlive()) {

				targetThread.interrupt();

				try {

					// Wait for it to die
					targetThread.join(500);
				} catch (InterruptedException ignore) {
				}

				// If current receiveThread refuses to die,
				// take more rigorous methods.
				if (targetThread.isAlive()) {

					// Not preferred but may be needed
					// to stop during a blocking read.
					targetThread.stop();

					// Wait for it to die
					try {
						targetThread.join(500);
					} catch (Throwable ignore) {
					}
				}

				p("Stopped receiveThread alive=" + targetThread.isAlive());

			}
		}

		/**
		 * Stop listening on stream from server.
		 */
		public void bailout() {
			p("In bailout()");
			stopThread();
			disconnect();
		}

		/**
		 * Handle refresh, by pausing.
		 */
		protected void refresh(Event aRefreshEvent) throws PushletException {
			try {
				// Wait for specified time.
				Thread.sleep(Long.parseLong(aRefreshEvent.getField(P_WAIT)));
			} catch (Throwable t) {
				warn("abort while refresing");
				refreshURL = null;
				return;
			}

			// If stopped during sleep, don't proceed
			if (receiveThread == null) {
				return;
			}

			// Create url to refresh
			refreshURL = pushletURL
					+ "?" + P_ID + "=" + id
					+ "&" + P_EVENT + "=" + E_REFRESH
					;

			if (reader != null) {
				try {
					reader.close();

				} catch (IOException ignore) {

				}
				reader = null;
			}

			reader = openURL(refreshURL);
		}

		/**
		 * Handle unknown Event (default behaviour).
		 */
		protected void handleUnknownEventType(String eventType, Event event, PushletClientListener listener) {
			warn("unsupported event type received: " + eventType);
		}
	}

	/**
	 * Authenticator
	 */
	private static class HTTPAuthenticateProxy extends Authenticator {

		/**
		 * Contributed by Dele Olajide
		 * See http://groups.yahoo.com/group/pushlet/message/634
		 */

		private String thePassword = "";
		private String theUser = "";

		public HTTPAuthenticateProxy(String username, String password) {

			thePassword = password;
			theUser = username;
		}

		protected PasswordAuthentication getPasswordAuthentication() {
			// System.out.println("[HttpAuthenticateProxy] Username = " + theUser);
			// System.out.println("[HttpAuthenticateProxy] Password = " + thePassword);

			return new PasswordAuthentication(theUser, thePassword.toCharArray());
		}

	}

}

/*
 * $Log: PushletClient.java,v $
 * Revision 1.19  2009/06/04 12:46:35  justb
 * PushletClient: add more hooks for extension (feat ID: 2799694 Craig M)
 *
 * Revision 1.18  2007/11/10 13:52:47  justb
 * make startDataEventListener method protected to allow overriding
 *
 * Revision 1.17  2006/10/29 16:47:57  justb
 * included patch from Jeff Nowakowski: wait until listener thread runs
 *
 * Revision 1.16  2005/05/06 20:08:20  justb
 * client enhancements
 *
 * Revision 1.15  2005/03/27 17:42:27  justb
 * enhancements
 *
 * Revision 1.14  2005/03/25 23:54:04  justb
 * *** empty log message ***
 *
 * Revision 1.13  2005/02/28 16:59:40  justb
 * fixes for leave and disconnect
 *
 * Revision 1.12  2005/02/28 15:57:54  justb
 * added SimpleListener example
 *
 * Revision 1.11  2005/02/21 12:31:44  justb
 * added proxy contribution from Dele Olajide
 *
 * Revision 1.10  2005/02/20 13:05:32  justb
 * removed the Postlet (integrated in Pushlet protocol)
 *
 * Revision 1.9  2005/02/18 10:07:23  justb
 * many renamings of classes (make names compact)
 *
 * Revision 1.8  2005/02/18 09:54:12  justb
 * refactor: rename Publisher Dispatcher and single Subscriber class
 *
 * Revision 1.7  2005/02/15 15:46:30  justb
 * client API improves
 *
 * Revision 1.6  2005/02/15 13:28:56  justb
 * first quick rewrite adapt for v2 protocol
 *
 * Revision 1.5  2004/10/25 21:23:44  justb
 * *** empty log message ***
 *
 * Revision 1.4  2004/10/24 13:52:51  justb
 * small fixes in client lib
 *
 * Revision 1.3  2004/10/24 12:58:18  justb
 * revised client and test classes for new protocol
 *
 * Revision 1.2  2004/09/03 22:35:37  justb
 * Almost complete rewrite, just checking in now
 *
 * Revision 1.1  2004/03/10 20:14:17  justb
 * renamed all *JavaPushletClient* to *PushletClient*
 *
 * Revision 1.10  2004/03/10 15:45:55  justb
 * many cosmetic changes
 *
 * Revision 1.9  2003/08/17 20:30:20  justb
 * cosmetic changes
 *
 * Revision 1.8  2003/08/15 08:37:40  justb
 * fix/add Copyright+LGPL file headers and footers
 *
 *
 */
最近下载更多
912299793  LV21 2021年2月10日
libin19830831  LV1 2021年1月11日
ihopefree  LV1 2018年12月20日
1154678481  LV9 2018年10月10日
a987013741  LV9 2018年8月10日
zhangtiancheng  LV10 2018年7月25日
ZeSino  LV5 2018年6月27日
swc1234  LV1 2018年6月11日
C544350851  LV26 2018年5月9日
xuxu002_long  LV8 2018年2月16日
最近浏览更多
lironggang  LV38 2023年4月25日
好的好的  LV8 2022年7月13日
nibin576  LV6 2022年5月31日
 LV1 2021年11月24日
13798956075  LV1 2021年10月8日
长得丑活得久  LV3 2021年3月8日
912299793  LV21 2021年2月10日
libin19830831  LV1 2021年1月11日
dongzhan  LV12 2020年12月22日
油炸糖醋鲤鱼  LV5 2020年10月7日
顶部 客服 微信二维码 底部
>扫描二维码关注最代码为好友扫描二维码关注最代码为好友