Pluggable parts

Action(edit)

Some points in NetarchiveSuite can be swapped out for other implementations, in a way similar to what Heritrix uses.

Also include relevant parts of design document that was basis for implementation of [:Development/Plugins:plug-ins]

[To be introduced more]

How pluggability works

Factories [To be described more]

...request for suggestions on pluggability areas [To be described more]

RemoteFile

The RemoteFile interface defines how large chunks of data are transferred between machines in a NetarchiveSuite installation. This is necessary because JMS has a relatively low limit on the size of messages, well below the several hundred megabytes to over a gigabyte that is easily stored in an ARC file. There are two current implementations available in the default distribution:

All three implementations will detect when 0 bytes are to be transferred and avoid creating unnecessary file in this case.

The RemoteFile interface is defined by the Java interface dk.netarkivet.common.distribute.RemoteFile:

/**
 *  RemoteFile: Interface for encapsulating remote files.
 *  Enables us to transmit large files between system components situated
 *  on different machines.
 *  Our current JMS broker(s) does not allow large message
 *  (i.e. messages > 70 MB).
 */
public interface RemoteFile extends Serializable {
    /**
     * Copy remotefile to local disk storage.
     * Used by the data recipient
     * @param destFile local File
     * @throws IOFailure on communication trouble.
     * @throws ArgumentNotValid on null parameter or non-writable file
     */
    public void copyTo(File destFile);
    /**
     * Write the contents of this remote file to an output stream.
     * @param out OutputStream that the data will be written to.  This stream
     * will not be closed by this operation.
     * @throws IOFailure If append operation fails
     * @throws ArgumentNotValid on null parameter
     */
    public void appendTo(OutputStream out);
    /**
     * Get an inputstream that contains the data transferred in this RemoteFile.
     * @return A stream object with the data in the object.  Note that the
     * close() method of this may throw exceptions if e.g. a transmission error
     * is detected.
     * @throws IOFailure on communication trouble.
     */
    public InputStream getInputStream();
    /**
     * Return the file name.
     * @return the file name
     */
    public String getName();
    /**
     * Returns a MD5 Checksum on the file. May return null, if checksums not
     * supported for this operation.
     * @return MD5 checksum
     */
    public String getChecksum();
    /**
     * Cleanup this remote file. The file is invalid after this.
     */
    public void cleanup();
    /** Returns the total size of the remote file.
     * @return Size of the remote file.
     */
    public long getSize();
}

The above code may differ from the latest version in our repository: https://gforge.statsbiblioteket.dk/plugins/scmsvn/viewcvs.php/*checkout*/trunk/src/dk/netarkivet/common/distribute/RemoteFile.java?root=netarchivesuite (requires access to our code repository).

JMSConnection

The JMSConnection provides access to a specific JMS connection. The default NetarchiveSuite distribution contains only one implementation, namely JMSConnectionSunMQ which uses Sun's OpenMQ. We recommend using this implementation, as other implementations have previously been found to violate some assumptions that NetarchiveSuite depends on.

The JMSConnection interface is defined by the abstract Java class dk.netarkivet.common.distribute.JMSConnection

Implementations of this interface needs to implement the three abstract methods in this interface

ConnectionFactory getConnectionFactory() throws JMSException;
Destination getDestination(String destinationName)throws JMSException;
void onException(JMSException e);

There are more fully explained in the following transcript of the interface.

/**
 * Handles the communication with a JMS broker. Note on Thread-safety: the
 * methods and fields of JMSConnection are not accessed by multiple threads
 * (though JMSConnection itself creates threads). Thus no synchronization is
 * needed on methods and fields of JMSConnection. A shutdown hook is also added,
 * which closes the connection. Class JMSConnection is now also a
 * exceptionhandler for the JMS Connections
 */
public abstract class JMSConnection implements ExceptionListener, CleanupIF {
    /** The log. */
    private static final Log log = LogFactory.getLog(JMSConnection.class);
    /**
     * Separator used in the consumer key. Separates the ChannelName from the
     * MessageListener.toString().
     */
    protected static final String CONSUMER_KEY_SEPARATOR = "##";
    /** The number to times to (re)try whenever a JMSException is thrown. */
    protected static final int JMS_MAXTRIES = 3;
    /** The JMS Connection. */
    protected Connection connection;
    /**
     * The Session handling messages sent to / received from the NetarchiveSuite
     * queues and topics.
     */
    protected Session session;
    /** Map for caching message producers. */
    protected final Map<String, MessageProducer> producers
            = Collections.synchronizedMap(
            new HashMap<String, MessageProducer>());
    /**
     * Map for caching message consumers (topic-subscribers and
     * queue-receivers).
     */
    protected final Map<String, MessageConsumer> consumers
            = Collections.synchronizedMap(
            new HashMap<String, MessageConsumer>());
    /**
     * Map for caching message listeners (topic-subscribers and
     * queue-receivers).
     */
    protected final Map<String, MessageListener> listeners
            = Collections.synchronizedMap(
            new HashMap<String, MessageListener>());
    /**
     * Lock for the connection. Locked for read on adding/removing listeners and
     * sending messages. Locked for write when connection, releasing and
     * reconnecting.
     */
    protected final ReentrantReadWriteLock connectionLock
            = new ReentrantReadWriteLock();
    /** Shutdown hook that closes the JMS connection. */
    protected Thread closeHook;
    /**
     * Singleton pattern is be used for this class. This is the one and only
     * instance.
     */
    protected static JMSConnection instance;
    /**
     * Should be implemented according to a specific JMS broker.
     *
     * @return QueueConnectionFactory
     *
     * @throws JMSException If unable to get QueueConnectionFactory
     */
    protected abstract ConnectionFactory getConnectionFactory()
            throws JMSException;
    /**
     * Should be implemented according to a specific JMS broker.
     *
     * @param destinationName the name of the wanted Queue
     *
     * @return The destination. Note that the implementation should make sure
     *         that this is a Queue or a Topic, as required by the
     *         NetarchiveSuite design. {@link Channels#isTopic(String)}
     *
     * @throws JMSException If unable to get a destination.
     */
    protected abstract Destination getDestination(String destinationName)
            throws JMSException;
    /**
     * Exceptionhandler for the JMSConnection. Implemented according to a
     * specific JMS broker. Should try to reconnect if at all possible.
     *
     * @param e a JMSException
     */
    public abstract void onException(JMSException e);
    /** Class constructor. */
    protected JMSConnection() {
    }
    /**
     * Initializes the JMS connection. Creates and starts connection and
     * session. Adds a shutdown hook that closes down JMSConnection. Adds this
     * object as ExceptionListener for the connection.
     *
     * @throws IOFailure if initialization fails.
     */
    protected void initConnection() throws IOFailure {
        log.debug("Initializing a JMS connection " + getClass().getName());
        connectionLock.writeLock().lock();
        try {
            int tries = 0;
            JMSException lastException = null;
            boolean operationSuccessful = false;
            while (!operationSuccessful && tries < JMS_MAXTRIES) {
                tries++;
                try {
                    establishConnectionAndSession();
                    operationSuccessful = true;
                } catch (JMSException e) {
                    closeConnection();
                    log.debug("Connect failed (try " + tries + ")", e);
                    lastException = e;
                    if (tries < JMS_MAXTRIES) {
                        log.debug("Will sleep a while before trying to"
                                  + " connect again");
                        TimeUtils.exponentialBackoffSleep(tries,
                                                          Calendar.MINUTE);
                    }
                }
            }
            if (!operationSuccessful) {
                log.warn("Could not initialize JMS connection "
                         + getClass(), lastException);
                cleanup();
                throw new IOFailure("Could not initialize JMS connection "
                                    + getClass(), lastException);
            }
            closeHook = new CleanupHook(this);
            Runtime.getRuntime().addShutdownHook(closeHook);
        } finally {
            connectionLock.writeLock().unlock();
        }
    }
    /**
     * Submit an object to the destination queue. This method cannot be
     * overridden. Override the method sendMessage to change functionality.
     *
     * @param msg The NetarkivetMessage to send to the destination queue (null
     *            not allowed)
     *
     * @throws ArgumentNotValid if nMsg is null.
     * @throws IOFailure        if the operation failed.
     */
    public final void send(NetarkivetMessage msg) {
        ArgumentNotValid.checkNotNull(msg, "msg");
        log.trace("Sending message (" + msg.toString() + ") to " + msg.getTo());
        sendMessage(msg, msg.getTo());
    }
    /**
     * Sends a message msg to the channel defined by the parameter to - NOT the
     * channel defined in the message.
     *
     * @param msg Message to be sent
     * @param to  The destination channel
     */
    public final void resend(NetarkivetMessage msg, ChannelID to) {
        ArgumentNotValid.checkNotNull(msg, "msg");
        ArgumentNotValid.checkNotNull(to, "to");
        log.trace("Resending message (" + msg.toString() + ") to "
                  + to.getName());
        sendMessage(msg, to);
    }
    /**
     * Submit an object to the reply queue.
     *
     * @param msg The NetarkivetMessage to send to the reply queue (null not
     *            allowed)
     *
     * @throws ArgumentNotValid if nMsg is null.
     * @throws PermissionDenied if message nMsg has not been sent yet.
     * @throws IOFailure        if unable to reply.
     */
    public final void reply(NetarkivetMessage msg) {
        ArgumentNotValid.checkNotNull(msg, "msg");
        log.trace("Reply on message (" + msg.toString() + ") to "
                  + msg.getReplyTo().getName());
        if (!msg.hasBeenSent()) {
            throw new PermissionDenied("Message has not been sent yet");
        }
        sendMessage(msg, msg.getReplyTo());
    }
    /**
     * Method adds a listener to the given queue or topic.
     *
     * @param mq the messagequeue to listen to
     * @param ml the messagelistener
     *
     * @throws IOFailure if the operation failed.
     */
    public void setListener(ChannelID mq, MessageListener ml) throws IOFailure {
        ArgumentNotValid.checkNotNull(mq, "ChannelID mq");
        ArgumentNotValid.checkNotNull(ml, "MessageListener ml");
        setListener(mq.getName(), ml);
    }
    /**
     * Removes the specified MessageListener from the given queue or topic.
     *
     * @param mq the given queue or topic
     * @param ml a messagelistener
     *
     * @throws IOFailure On network trouble
     */
    public void removeListener(ChannelID mq, MessageListener ml)
            throws IOFailure {
        ArgumentNotValid.checkNotNull(mq, "ChannelID mq");
        ArgumentNotValid.checkNotNull(ml, "MessageListener ml");
        removeListener(ml, mq.getName());
    }
    /**
     * Clean up. Remove close connection, remove shutdown hook and null the
     * instance.
     */
    public void cleanup() {
        connectionLock.writeLock().lock();
        try {
            //Remove shutdown hook
            log.info("Starting cleanup");
            try {
                if (closeHook != null) {
                    Runtime.getRuntime().removeShutdownHook(closeHook);
                }
            } catch (IllegalStateException e) {
                //Okay, it just means we are already shutting down.
            }
            closeHook = null;
            //Close session
            closeConnection();
            //Clear list of listeners
            listeners.clear();
            instance = null;
            log.info("Cleanup finished");
        } finally {
            connectionLock.writeLock().unlock();
        }
    }
    /**
     * Close connection, session and listeners. Will ignore trouble, and simply
     * log it.
     */
    private void closeConnection() {
        // Close terminates all pending message received on the
        // connection's session's consumers.
        if (connection != null) { // close connection
            try {
                connection.close();
            } catch (JMSException e) {
                // Just ignore it
                log.warn("Error closing JMS Connection.", e);
            }
        }
        connection = null;
        session = null;
        consumers.clear();
        producers.clear();
    }
    /**
     * Unwraps a NetarkivetMessage from an ObjectMessage.
     *
     * @param msg a javax.jms.ObjectMessage
     *
     * @return a NetarkivetMessage
     *
     * @throws ArgumentNotValid when msg in valid or format of JMS Object
     *                          message is invalid
     */
    public static NetarkivetMessage unpack(Message msg)
            throws ArgumentNotValid {
        ArgumentNotValid.checkNotNull(msg, "msg");
        ObjectMessage objMsg;
        try {
            objMsg = (ObjectMessage) msg;
        } catch (ClassCastException e) {
            log.warn("Invalid message type: " + msg.getClass());
            throw new ArgumentNotValid("Invalid message type: "
                                       + msg.getClass());
        }
        NetarkivetMessage netMsg;
        String classname = "Unknown class"; // for error reporting purposes
        try {
            classname = objMsg.getObject().getClass().getName();
            netMsg = (NetarkivetMessage) objMsg.getObject();
            // Note: Id is only updated if the message does not already have an
            // id. On unpack, this means the first time the message is received.
            netMsg.updateId(msg.getJMSMessageID());
        } catch (ClassCastException e) {
            log.warn("Invalid message type: " + classname, e);
            throw new ArgumentNotValid("Invalid message type: " + classname, e);
        } catch (Exception e) {
            String message = "Message invalid. Unable to unpack "
                             + "message: " + classname;
            log.warn(message, e);
            throw new ArgumentNotValid(message, e);
        }
        log.trace("Unpacked message '" + netMsg + "'");
        return netMsg;
    }
    /**
     * Submit an ObjectMessage to the destination channel.
     *
     * @param nMsg the NetarkivetMessage to be wrapped and send as an
     *             ObjectMessage
     * @param to   the destination channel
     *
     * @throws IOFailure if message failed to be sent.
     */
    protected void sendMessage(NetarkivetMessage nMsg, ChannelID to)
            throws IOFailure {
        Exception lastException = null;
        boolean operationSuccessful = false;
        int tries = 0;
        while (!operationSuccessful && tries < JMS_MAXTRIES) {
            tries++;
            try {
                doSend(nMsg, to);
                operationSuccessful = true;
            } catch (JMSException e) {
                log.debug("Send failed (try " + tries + ")", e);
                lastException = e;
                if (tries < JMS_MAXTRIES) {
                    onException(e);
                    log.debug("Will sleep a while before trying to send again");
                    TimeUtils.exponentialBackoffSleep(tries,
                                                      Calendar.MINUTE);
                }
            } catch (Exception e) {
                log.debug("Send failed (try " + tries + ")", e);
                lastException = e;
                if (tries < JMS_MAXTRIES) {
                    reconnect();
                    log.debug("Will sleep a while before trying to send again");
                    TimeUtils.exponentialBackoffSleep(tries,
                                                      Calendar.MINUTE);
                }
            }
        }
        if (!operationSuccessful) {
            log.warn("Send failed", lastException);
            throw new IOFailure("Send failed.", lastException);
        }
    }
    /**
     * Do a reconnect to the JMSbroker. Does absolutely nothing, if already in
     * the process of reconnecting.
     */
    protected void reconnect() {
        if (!connectionLock.writeLock().tryLock()) {
            log.debug("Reconnection already in progress. Do nothing");
            return;
        }
        try {
            log.info("Trying to reconnect to jmsbroker");
            boolean operationSuccessful = false;
            Exception lastException = null;
            int tries = 0;
            while (!operationSuccessful && tries < JMS_MAXTRIES) {
                tries++;
                try {
                    doReconnect();
                    operationSuccessful = true;
                } catch (Exception e) {
                    lastException = e;
                    log.debug("Reconnect failed (try " + tries + ")", e);
                    if (tries < JMS_MAXTRIES) {
                        log.debug("Will sleep a while before trying to"
                                  + " reconnect again");
                        TimeUtils.exponentialBackoffSleep(tries,
                                                          Calendar.MINUTE);
                    }
                }
            }
            if (!operationSuccessful) {
                log.warn("Reconnect to JMS broker failed",
                         lastException);
                closeConnection();
            }
        } finally {
            // Tell everybody, that we are not trying to reconnect any longer
            connectionLock.writeLock().unlock();
        }
    }
    /**
     * Helper method for getting the right producer for a queue or topic.
     *
     * @param queueName The name of the channel
     *
     * @return The producer for that channel. A new one is created, if none
     *         exists.
     *
     * @throws JMSException If a new producer cannot be created.
     */
    private MessageProducer getProducer(String queueName) throws JMSException {
        // Check if producer is in cache
        // If it is not, it is created and stored in cache:
        MessageProducer producer = producers.get(queueName);
        if (producer == null) {
            producer = getSession().createProducer(getDestination(queueName));
            producers.put(queueName, producer);
        }
        return producer;
    }
    /**
     * Get the session. Will try reconnecting if session is null.
     *
     * @return The session.
     *
     * @throws IOFailure if no session is available, and reconnect does not
     *                   help.
     */
    private Session getSession() {
        if (session == null) {
            reconnect();
        }
        if (session == null) {
            throw new IOFailure("Session not available");
        }
        return session;
    }
    /**
     * Helper method for getting the right consumer for a queue or topic, and
     * message listener.
     *
     * @param channelName The name of the channel
     * @param ml          The message listener to add as listener to the
     *                    channel
     *
     * @return The producer for that channel. A new one is created, if none
     *         exists.
     *
     * @throws JMSException If a new producer cannot be created.
     */
    private MessageConsumer getConsumer(String channelName, MessageListener ml)
            throws JMSException {
        String key = getConsumerKey(channelName, ml);
        MessageConsumer consumer = consumers.get(key);
        if (consumer == null) {
            consumer = getSession().createConsumer(getDestination(channelName));
            consumers.put(key, consumer);
            listeners.put(key, ml);
        }
        return consumer;
    }
    /**
     * Generate a consumerkey based on the given channel name and
     * messageListener.
     *
     * @param channel         Channel name
     * @param messageListener a messageListener
     *
     * @return the generated consumerkey.
     */
    protected static String getConsumerKey(
            String channel, MessageListener messageListener) {
        return channel + CONSUMER_KEY_SEPARATOR + messageListener;
    }
    /**
     * Get the channelName embedded in a consumerKey.
     *
     * @param consumerKey a consumerKey
     *
     * @return name of channel embedded in a consumerKey
     */
    private static String getChannelName(String consumerKey) {
        //assumes argument consumerKey was created using metod getConsumerKey()
        return consumerKey.split(CONSUMER_KEY_SEPARATOR)[0];
    }
    /**
     * Helper method to establish one Connection and associated Session,
     *
     * @throws JMSException If some JMS error occurred during the creation of
     *                      the required JMS connection and session
     */
    private void establishConnectionAndSession() throws JMSException {
        // Establish a queue connection and a session
        connection = getConnectionFactory().createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection.setExceptionListener(this);
        connection.start();
    }
    /**
     * Sends an ObjectMessage on a queue destination.
     *
     * @param msg the NetarkivetMessage to be wrapped and send as an
     *            ObjectMessage.
     * @param to  the destination topic.
     *
     * @throws JMSException if message failed to be sent.
     */
    private void doSend(NetarkivetMessage msg, ChannelID to)
            throws JMSException {
        connectionLock.readLock().lock();
        try {
            ObjectMessage message = getSession().createObjectMessage(msg);
            synchronized (msg) {
                getProducer(to.getName()).send(message);
                // Note: Id is only updated if the message does not already have
                // an id. This ensures that resent messages keep the same ID
                msg.updateId(message.getJMSMessageID());
            }
        } finally {
            connectionLock.readLock().unlock();
        }
        log.trace("Sent message '" + msg.toString() + "'");
    }
    /**
     * Method adds a listener to the given queue or topic.
     *
     * @param channelName the messagequeue to listen to
     * @param ml          the messagelistener
     *
     * @throws IOFailure if the operation failed.
     */
    private void setListener(String channelName, MessageListener ml) {
        log.debug("Adding " + ml.toString() + " as listener to "
                  + channelName);
        String errMsg = "JMS-error - could not add Listener to queue/topic: "
                        + channelName;
        int tries = 0;
        boolean operationSuccessful = false;
        Exception lastException = null;
        while (!operationSuccessful && tries < JMS_MAXTRIES) {
            tries++;
            try {
                connectionLock.readLock().lock();
                try {
                    getConsumer(channelName, ml).setMessageListener(ml);
                } finally {
                    connectionLock.readLock().unlock();
                }
                operationSuccessful = true;
            } catch (JMSException e) {
                lastException = e;
                log.debug("Set listener failed (try " + tries + ")", e);
                if (tries < JMS_MAXTRIES) {
                    onException(e);
                    log.debug("Will sleep a while before trying to set listener"
                              + " again");
                    TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
                }
            } catch (Exception e) {
                lastException = e;
                log.debug("Set listener failed (try " + tries + ")", e);
                if (tries < JMS_MAXTRIES) {
                    reconnect();
                    log.debug("Will sleep a while before trying to set listener"
                              + " again");
                    TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
                }
            }
        }
        if (!operationSuccessful) {
            log.warn(errMsg, lastException);
            throw new IOFailure(errMsg, lastException);
        }
    }
    /**
     * Remove a messagelistener from a channel (a queue or a topic).
     *
     * @param ml          A specific MessageListener
     * @param channelName a channelname
     */
    private void removeListener(MessageListener ml, String channelName) {
        String errMsg = "JMS-error - could not remove Listener from "
                        + "queue/topic: " + channelName;
        int tries = 0;
        Exception lastException = null;
        boolean operationSuccessful = false;
        log.info("Removing listener from channel '" + channelName + "'");
        while (!operationSuccessful && tries < JMS_MAXTRIES) {
            try {
                tries++;
                connectionLock.readLock().lock();
                try {
                    MessageConsumer messageConsumer = getConsumer(channelName,
                                                                  ml);
                    messageConsumer.close();
                    consumers.remove(getConsumerKey(channelName, ml));
                    listeners.remove(getConsumerKey(channelName, ml));
                } finally {
                    connectionLock.readLock().unlock();
                }
                operationSuccessful = true;
            } catch (JMSException e) {
                lastException = e;
                log.debug("Remove  listener failed (try " + tries + ")", e);
                onException(e);
                log.debug("Will and sleep a while before trying to remove"
                          + " listener again");
                TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
            } catch (Exception e) {
                lastException = e;
                log.debug("Remove  listener failed (try " + tries + ")", e);
                reconnect();
                log.debug("Will and sleep a while before trying to remove"
                          + " listener again");
                TimeUtils.exponentialBackoffSleep(tries, Calendar.MINUTE);
            }
        }
        if (!operationSuccessful) {
            log.warn(errMsg, lastException);
            throw new IOFailure(errMsg, lastException);
        }
    }
    /**
     * Reconnect to JMSBroker and reestablish session. Resets senders and
     * publishers.
     *
     * @throws JMSException If unable to reconnect to JMSBroker and/or
     *                      reestablish sesssions
     */
    private void doReconnect()
            throws JMSException {
        closeConnection();
        establishConnectionAndSession();
        // Add listeners already stored in the consumers map
        log.debug("Re-add listeners");
        for (Map.Entry<String, MessageListener> listener
                : listeners.entrySet()) {
            setListener(getChannelName(listener.getKey()),
                        listener.getValue());
        }
        log.info("Reconnect successful");
    }
}

The above code may differ from the latest version in our repository: https://gforge.statsbiblioteket.dk/plugins/scmsvn/viewcvs.php/*checkout*/trunk/src/dk/netarkivet/common/distribute/JMSConnection.java?root=netarchivesuite (requires access to our code repository).

ArcRepositoryClient

The ArcRepositoryClient handles access to the Archive module, both upload and low-level access. There are two implementations in the default distribution:

The ArcRepositoryClient interface is defined by the Java interface dk.netarkivet.common.distribute.arcrepository.ArcRepositoryClient:

/**
 * Generic interface defining all methods that an ArcRepository provides.
 * Typically, an application using this will only see one of the restricted
 * superinterfaces.
 *
 */
public interface ArcRepositoryClient extends
        HarvesterArcRepositoryClient, ViewerArcRepositoryClient,
                PreservationArcRepositoryClient {
    /** Call on shutdown to release external resources. */
    void close();
    /**
     * Gets a single ARC record out of the ArcRepository.
     *
     * @param arcfile The name of a file containing the desired record.
     * @param index   The offset of the desired record in the file
     * @return a BitarchiveRecord-object, or null if request times out or object
     * is not found.
     * @throws IOFailure If the get operation failed.
     * @throws ArgumentNotValid if the given arcfile is null or empty string,
     * or the given index is negative.
     */
    BitarchiveRecord get(String arcfile, long index) throws ArgumentNotValid;
    /**
     * Retrieves a file from an ArcRepository and places it in a local file.
     * @param arcfilename Name of the arcfile to retrieve.
     * @param replica The bitarchive to retrieve the data from. On
     * implementations with only one replica, null may be used.
     * @param toFile Filename of a place where the file fetched can be put.
     * @throws IOFailure if there are problems getting a reply or the file
     * could not be found.
     */
    void getFile(String arcfilename, Replica replica, File toFile);
    /**
     * Store the given file in the ArcRepository.  After storing, the file is
     * deleted.
     *
     * @param file A file to be stored. Must exist.
     * @throws IOFailure thrown if store is unsuccessful, or failed to clean
     * up files after the store operation.
     * @throws ArgumentNotValid if file parameter is null or file is not an
     *                          existing file.
     */
    void store(File file) throws IOFailure, ArgumentNotValid;
    /**
     * Runs a batch batch job on each file in the ArcRepository.
     *
     * @param job An object that implements the FileBatchJob interface. The
     *  initialize() method will be called before processing and the finish()
     *  method will be called afterwards. The process() method will be called
     *  with each File entry.
     *
     * @param replicaId The archive to execute the job on.
     * @return The status of the batch job after it ended.
     */
    BatchStatus batch(FileBatchJob job, String replicaId);
    /** Updates the administrative data in the ArcRepository for a given
     * file and replica.
     *
     * @param fileName The name of a file stored in the ArcRepository.
     * @param bitarchiveId The id of the replica that the administrative
     * data for fileName is wrong for.
     * @param newval What the administrative data will be updated to.
     */
    void updateAdminData(String fileName, String bitarchiveId,
                         ReplicaStoreState newval);
    /** Updates the checksum kept in the ArcRepository for a given
     * file.  It is the responsibility of the ArcRepository implementation to
     * ensure that this checksum matches that of the underlying files.
     *
     * @param filename The name of a file stored in the ArcRepository.
     * @param checksum The new checksum.
     */
    void updateAdminChecksum(String filename, String checksum);
    /** Remove a file from one part of the ArcRepository, retrieving a copy
     * for security purposes.  This is typically used when repairing a file
     * that has been corrupted.
     *
     * @param fileName The name of the file to remove.
     * @param bitarchiveId The id of the replica from which to remove the file.
     * @param checksum The checksum of the file to be removed.
     * @param credentials A string that shows that the user is allowed to
     * perform this operation.
     * @return A local copy of the file removed.
     */
    File removeAndGetFile(String fileName, String bitarchiveId,
                          String checksum, String credentials);
}

The above code may differ from the latest version in our repository: https://gforge.statsbiblioteket.dk/plugins/scmsvn/viewcvs.php/*checkout*/trunk/src/dk/netarkivet/common/distribute/arcrepository/ArcRepositoryClient.java?root=netarchivesuite(requires access to our code repository).

IndexClient

The IndexClient provides the Lucene indices that are used for deduplication and for viewerproxy access. It makes use of the ArcRepositoryClient to fetch data from the archive and implements several layers of caching of these data and of Lucene-indices created from the data. It is advisable to perform regular clean-up of the cache directories.

The IndexClient interface is defined by the Java interface dk.netarkivet.common.distribute.indexserver.JobIndexCache:

/** An interface to a cache of data for jobs. */
public interface JobIndexCache {
    /** Get an index for the given list of job IDs.
     * The resulting file contains a suitably sorted list.
     * This method should always be safe for asynchronous calling.
     * This method may use a cached version of the file.
     *
     * @param jobIDs Set of job IDs to generate index for.
     * @return An index, consisting of a file and the set this is an index for.
     * This file must not be modified or deleted, since it is part of the cache
     * of data.
     */
    Index<Set<Long>> getIndex(Set<Long> jobIDs);

The above code may differ from the latest version in our repository: https://gforge.statsbiblioteket.dk/plugins/scmsvn/viewcvs.php/*checkout*/trunk/src/dk/netarkivet/common/distribute/indexserver/JobIndexCache.java?root=netarchivesuite (requires access to our code repository).

DBSpecifics

This DBSpecifics interface allows substitution of the database used to store harvest definitions. There are three implementations, one for MySQL, one for Derby running as a separate server, and one for Derby running embeddedly. Which is these to choose is mostly a matter of individual preference. The embedded Derby implementation has been in use at the Danish web archive for over two years.

The interface is defined by the abstract Java class dk.netarkivet.harvester.datamodel.DBSpecifics:

/**
 * Abstract collection of DB methods that are not standard SQL.  This class
 * is a singleton class whose actual implementation is provided by a subclass
 * as determined by the DB_SPECIFICS_CLASS setting.
 *
 */
public abstract class DBSpecifics extends SettingsFactory<DBSpecifics> {

    /** The instance of the DBSpecifics class. */
    private static DBSpecifics instance;

    Log log = LogFactory.getLog(DBSpecifics.class);
    /** Get the singleton instance of the DBSpecifics implementation class.
     *
     * @return An instance of DBSpecifics with implementations for a given DB.
     */
    public static synchronized DBSpecifics getInstance() {
        if (instance == null) {
            instance = getInstance(CommonSettings.DB_SPECIFICS_CLASS);
        }
        return instance;
    }
    /**
     * Shutdown the database system, if running embeddedly.  Otherwise, this
     * is ignored.
     *
     * Will log a warning on errors, but otherwise ignore them.
     */
    public abstract void shutdownDatabase();
    /** Get a temporary table for short-time use.  The table should be
     * disposed of with dropTemporaryTable.  The table has two columns
     * domain_name varchar(Constants.MAX_NAME_SIZE)
     + config_name varchar(Constants.MAX_NAME_SIZE)
     * All rows in the table must be deleted at commit or rollback.
     *
     * @param c The DB connection to use.
     * @throws SQLException if there is a problem getting the table.
     * @return The name of the created table
     */
    public abstract String getJobConfigsTmpTable(Connection c)
    throws SQLException;
    /** Dispose of a temporary table gotten with getTemporaryTable. This can be
     * expected to be called from within a finally clause, so it mustn't throw
     * exceptions.
     *
     * @param c The DB connection to use.
     * @param tableName The name of the temporarily created table.
     */
    public abstract void dropJobConfigsTmpTable(Connection c, String tableName);
    /**
     * Backup the database.  For server-based databases, where the administrator
     * is expected to perform the backups, this method should do nothing.
     * This method gets called within one hour of the hour-of-day indicated
     * by the DB_BACKUP_INIT_HOUR settings.
     *
     * @param backupDir Directory to which the database should be backed up
     * @throws SQLException On SQL trouble backing up database
     * @throws PermissionDenied if the directory cannot be created.
     */
    public abstract void backupDatabase(File backupDir) throws SQLException;
    /** Get the name of the JDBC driver class that handles interfacing
     * to this server.
     *
     * @return The name of a JDBC driver class
     */
    public abstract String getDriverClassName();
    /** Update a table to a newer version, if necessary.  This will check the
     * schemaversions table to see the current version and perform a
     * table-specific update if required.
     *
     * @param tableName The table to update
     * @param toVersion The version to update the table to.
     * @throws IllegalState If the table is an unsupported version, and
     * the toVersion is less than the current version of the table
     * @throws NotImplementedException If no method exists for migration from
     * current version of the table to the toVersion of the table.
     * @throws IOFailure in case of problems in interacting with the database
     */

    public synchronized void updateTable(String tableName, int toVersion) {
        ArgumentNotValid.checkNotNullOrEmpty(tableName, "String tableName");
        ArgumentNotValid.checkPositive(toVersion, "int toVersion");
        int currentVersion = DBUtils.getTableVersion(
                DBConnect.getDBConnection(), tableName);
        log.info("Trying to migrate table '" + tableName + "' from version '"
                + currentVersion + "' to version '" + toVersion + "'.");
        if (currentVersion == toVersion) {
            // Nothing to do. Version of table is already correct.
            return;
        }
        if (currentVersion > toVersion) {
            throw new IllegalState("Database is in an illegalState: "
                    + "The current version of table '" + tableName
                    + "' is not acceptable "
                    + "(current version is greater than requested version).");
        }
        if (tableName.equals("jobs")) {
            if (currentVersion < 3) {
                throw new IllegalState("Database is in an illegalState: "
                        + "The current version " + currentVersion
                        + " of table '" + tableName + "' is not acceptable. "
                        + "(current version is less than open source version).");
            }
            if (currentVersion == 3 && toVersion >= 4) {
                migrateJobsv3tov4();
                currentVersion = 4;
            }
            if (currentVersion == 4 && toVersion >= 5) {
                migrateJobsv4tov5();
            }
            if (currentVersion == 5 && toVersion >= 6) {
                throw new NotImplementedException(
                        "No method exists for migrating table '" + tableName
                                + "' from version " + currentVersion
                                + " to version " + toVersion);
            }
            // future updates of the job table are inserted here
            if (currentVersion > 5) {
                throw new IllegalState("Database is in an illegalState: "
                        + "The current version (" + currentVersion
                        + ") of table '" + tableName
                        + "' is not an acceptable/known version. ");
            }
        } else if (tableName.equals("fullharvests")) {
            if (currentVersion < 2) {
                throw new IllegalState("Database is in an illegalState: "
                        + "The current version " + currentVersion
                        + " of table '" + tableName + "' is not acceptable. "
                        + "(current version is less than open source version).");
            }
            if (currentVersion == 2 && toVersion >= 3) {
                migrateFullharvestsv2tov3();
                currentVersion = 3;
            }
            if (currentVersion == 3 && toVersion >= 4) {
                throw new NotImplementedException(
                        "No method exists for migrating table '" + tableName
                                + "' from version " + currentVersion
                                + " to version " + toVersion);
            }
            // future updates of the job table are inserted here
            if (currentVersion > 4) {
                throw new IllegalState("Database is in an illegalState: "
                        + "The current version (" + currentVersion
                        + ") of table '" + tableName
                        + "' is not an acceptable/known version. ");
            }
        } else if (tableName.equals("configurations")) {
            if (currentVersion < 3) {
                throw new IllegalState("Database is in an illegalState: "
                        + "The current version " + currentVersion
                        + " of table '" + tableName + "' is not acceptable. "
                        + "(current version is less than open source version).");
            }
            if (currentVersion == 3 && toVersion >= 4) {
                migrateConfigurationsv3ov4();
                currentVersion = 4;
            }
            if (currentVersion == 4 && toVersion >= 5) {
                throw new NotImplementedException(
                        "No method exists for migrating table '" + tableName
                                + "' from version " + currentVersion
                                + " to version " + toVersion);
            }
            // future updates of the job table are inserted here
            if (currentVersion > 5) {
                throw new IllegalState("Database is in an illegalState: "
                        + "The current version (" + currentVersion
                        + ") of table '" + tableName
                        + "' is not an acceptable/known version. ");
            }
        } else if (tableName.equals("global_crawler_trap_lists")) {
            if (currentVersion == 0 && toVersion == 1) {
                createGlobalCrawlerTrapLists();
                currentVersion = 1;
            }
            if (currentVersion > 1) {
                throw new NotImplementedException(
                        "No method exists for migrating table '" + tableName
                                + "' from version " + currentVersion
                                + " to version " + toVersion);
            }
        } else if (tableName.equals("global_crawler_trap_expressions")) {
           if (currentVersion == 0 && toVersion == 1) {
                createGlobalCrawlerTrapExpressions();
                currentVersion = 1;
            }
            if (currentVersion > 1) {
                throw new NotImplementedException(
                        "No method exists for migrating table '" + tableName
                                + "' from version " + currentVersion
                                + " to version " + toVersion);
            }
        } else {
            // This includes cases where currentVersion < toVersion
            // for all tables that does not have migration functions yet
            throw new NotImplementedException(
                    "No method exists for migrating table '" + tableName
                            + "' to version " + toVersion);
        }
    }
    /** Migrates the 'jobs' table from version 3 to version 4
     * consisting of a change of the field forcemaxbytes from int to bigint
     * and setting its default to -1.
     * Furthermore the default value for field num_configs is set to 0.
     * @throws IOFailure in case of problems in interacting with the database
     */
    protected abstract void migrateJobsv3tov4();

    /** Migrates the 'jobs' table from version 4 to version 5
     * consisting of adding new fields 'resubmitted_as_job' and 'submittedDate'.
     * @throws IOFailure in case of problems in interacting with the database
     */
    protected abstract void migrateJobsv4tov5();

    /** Migrates the 'configurations' table from version 3 to version 4.
     * This consists of altering the default value of field 'maxbytes' to -1.
     */
    protected abstract void migrateConfigurationsv3ov4();

    /** Migrates the 'fullharvests' table from version 2 to version 3.
     * This consists of altering the default value of field 'maxbytes' to -1.
     */
    protected abstract void migrateFullharvestsv2tov3();
    /**
     * Creates the initial (version 1) of table 'global_crawler_trap_lists'.
     */
    protected abstract void createGlobalCrawlerTrapLists();
    /**
     * Creates the initial (version 1) of table
     * 'global_crawler_trap_expressions'.
     */
    protected abstract void createGlobalCrawlerTrapExpressions();
}

The above code may differ from the latest version in our repository: https://gforge.statsbiblioteket.dk/plugins/scmsvn/viewcvs.php/*checkout*/trunk/src/dk/netarkivet/harvester/datamodel/DBSpecifics.java?root=netarchivesuite (requires access to our code repository).

Notifications

The Notifications interface lets you choose how you want important error notifications to be handled in your system. Two implementations exist, one to send emails, and one to print the messages to System.err. Adding more specialised plugins should be easy.

The Notifications interface is defined by the abstract class dk.netarkivet.common.utils.Notifications:

/**
 * This class encapsulates reacting to a serious error message.
 *
 */
public abstract class Notifications {
    /**
     * Notify about an error event. This is the same as calling
     * {@link #errorEvent(String, Throwable)} with null as the second parameter.
     *
     * @param message The error message to notify about.
     */
    public void errorEvent(String message) {
        errorEvent(message, null);
    }
    /**
     * Notifies about an error event including an exception.
     *
     * @param message The error message to notify about.
     * @param e       The exception to notify about.
     * May be null for no exception.
     */
    public abstract void errorEvent(String message, Throwable e);
}

The above code may differ from the latest version in our repository: https://gforge.statsbiblioteket.dk/plugins/scmsvn/viewcvs.php/*checkout*/trunk/src/dk/netarkivet/common/utils/Notifications.java?root=netarchivesuite (requires access to our code repository).

HeritrixController

The HeritrixController interface defines our interface for initialize a running Heritrix instance and communicate with this instance. We have an implementation that starts heritrix as its own process and then communicates with it using JMX (JMXHeritrixController), and a deprecated implementation with heritrix embedded inside NetarchiveSuite (DirectHeritrixController), which controls Heritrix using a CrawlController instance.

The HeritrixController interface is defined by the Java interface dk.netarkivet.harvester.harvesting.HeritrixController:

/**
 * This interface encapsulates the direct access to Heritrix, allowing
 * for accessing in various ways (direct class access or JMX).  Heritrix is
 * expected to perform one crawl for each instance of an implementing class.
 *
 */
public interface HeritrixController {
    /** Initialize a new CrawlController for executing a Heritrix crawl.
     * This does not start the crawl.
     *
     */
    void initialize();
    /** Request that Heritrix start crawling.  When this method returns,
     * either Heritrix has failed in the early stages, or the crawljob has
     * been successfully created.  Actual crawling will commence at some
     * point hereafter.
     * @throws IOFailure If something goes wrong during startup.
     */
    void requestCrawlStart() throws IOFailure;
    /** Tell Heritrix to stop crawling.  Heritrix may take a while to actually
     * stop, so you cannot assume that crawling is stopped when this method
     * returns.
     */
    void beginCrawlStop();
    /** Request that crawling stops.
     * This makes a call to beginCrawlStop(), unless the crawler
     * is already shutting down. In that case it does nothing.
     *
     * @param reason A human-readable reason the crawl is being stopped.
     */
    void requestCrawlStop(String reason);
    /** Query whether Heritrix is in a state where it can finish crawling.
     *  Returns true if no uris remain to be harvested, or it has met
     *  either the maxbytes limit, the document limit,
     *  or the time-limit for the current harvest.
     *
     * @return True if Heritrix thinks it is time to stop crawling.
     */
    boolean atFinish();
    /** Returns true if the crawl has ended, either because Heritrix finished
     * or because we terminated it.
     *
     * @return True if the CrawlEnded event has happened in Heritrix,
     * indicating that all crawls have stopped.
     */
    boolean crawlIsEnded();
    /**
     * Get the number of currently active ToeThreads (crawler threads).
     * @return Number of ToeThreads currently active within Heritrix.
     */
    int getActiveToeCount();
    /** Get the number of URIs currently on the queue to be processed.  This
     * number may not be exact and should only be used in informal texts.
     *
     * @return How many URIs Heritrix have lined up for processing.
     */
    long getQueuedUriCount();
    /**
     * Get an estimate of the rate, in kb, at which documents
     * are currently being processed by the crawler.
     * @see org.archive.crawler.framework.StatisticsTracking#currentProcessedKBPerSec()
     * @return Number of KB data downloaded by Heritrix over an undefined
     * interval up to now.
     */
    int getCurrentProcessedKBPerSec();
    /** Get a human-readable set of statistics on the progress of the crawl.
     *  The statistics is
     *  discovered uris, queued uris, downloaded uris, doc/s(avg), KB/s(avg),
     *  dl-failures, busy-thread, mem-use-KB, heap-size-KB, congestion,
     *  max-depth, avg-depth.
     *  If no statistics are available, the string "No statistics available"
     *  is returned.
     *  Note: this method may disappear in the future.
     *
     * @return Some ascii-formatted statistics on the progress of the crawl.
     */
    String getProgressStats();
    /** Returns true if the crawler has been paused, and thus not
     * supposed to fetch anything.  Heritrix may still be fetching stuff, as
     * it takes some time for it to go into full pause mode.  This method can
     * be used as an indicator that we should not be worried if Heritrix
     * appears to be idle.
     *
     * @return True if the crawler has been paused, e.g. by using the
     * Heritrix GUI.
     */
    boolean isPaused();
    /** Release any resources kept by the class.
     */
    void cleanup();
    /**
     * Get harvest information. An example of this can be an URL pointing
     * to the GUI of a running Heritrix process.
     * @return information about the harvest process.
     */
    String getHarvestInformation();
}

The above code may differ from the latest version in our repository: https://gforge.statsbiblioteket.dk/plugins/scmsvn/viewcvs.php/*checkout*/trunk/src/dk/netarkivet/harvester/harvesting/HeritrixController.java?root=netarchivesuite (requires access to our code repository).

ActiveBitPreservation

The ActiveBitpreservaton interface defines our interface for initializing bitpreservation actions from our GUI. We have a filebased and a database based implementations. Both these implementations communicate with the archive through the ArcRepository interface.

The ActiveBitPreservation interface is defined by the Java interface dk.netarkivet.archive.arcrepository.bitpreservation.ActiveBitPreservation:

/**
 * All bitpreservation implementations are assumed to have access to admin data
 * and bitarchives. Operations may request information from the bitarchive by
 * sending batch jobs, reading admin data directly, or reading from cached
 * information from either.
 */
public interface ActiveBitPreservation {
    // General state
    /**
     * Get details of the state of one or more files in the bitarchives
     * and admin data.
     *
     * @param filenames the list of filenames to investigate
     * @return a map ([filename]-> [FilePreservationState]) with the
     *  preservationstate of all files in the list.
     *  The preservationstates in the map will be null for all filenames,
     *  that are not found in admin data.
     */
    Map<String, PreservationState> getPreservationStateMap(
            String... filenames);
    /**
     * Get the details of the state of the given file in the bitarchives
     * and admin data.
     *
     * @param filename A given file
     * @return the FilePreservationState for the given file. This will be null,
     * if the filename is not found in admin data.
     */
    PreservationState getPreservationState(String filename);

    // Check state for bitarchives
    /**
     * Return a list of files marked as missing on this replica.
     * A file is considered missing if it exists in admin data, but is not known
     * in the bit archives. Guaranteed not to recheck the archive, simply
     * returns the list generated by the last test.
     *
     * @param replica The replica to get missing files from.
     * @return A list of missing files.
     */
    Iterable<String> getMissingFiles(Replica replica);
    /**
     * Return a list of files with changed checksums on this replica.
     * A file is considered changed if checksum does not compare to
     * admin data. Guaranteed not to recheck the archive, simply returns the
     * list generated by the last test.
     *
     * @param replica The replica to get a list of changed files from.
     * @return A list of files with changed checksums.
     */
    Iterable<String> getChangedFiles(Replica replica);
    /** Update the list of files in a given bitarchive. This will be used for
     * the next call to getMissingFiles.
     *
     * @param replica The replica to update list of files for.
     */
    void findMissingFiles(Replica replica);
    /** Update the list of checksums in a given replica. This will be used
     * for the next call to getChangedFiles.
     *
     * @param replica The replica to update list of files for.
     */
    void findChangedFiles(Replica replica);
    /**
     * Return the number of missing files for replica. Guaranteed not to
     * recheck the archive, simply returns the number generated by the last
     * test.
     *
     * @param replica The replica to get the number of missing files from.
     * @return The number of missing files.
     */
    long getNumberOfMissingFiles(Replica replica);
    /**
     * Return the number of changed files for replica. Guaranteed not to
     * recheck the archive, simply returns the number generated by the last
     * test.
     *
     * @param replica The replica to get the number of changed files from.
     * @return The number of changed files.
     */
    long getNumberOfChangedFiles(Replica replica);
    /**
     * Return the total number of files for replica. Guaranteed not to
     * recheck the archive, simply returns the number generated by the last
     * update.
     *
     * @param replica The replica to get the number of files from.
     * @return The number of files.
     */
    long getNumberOfFiles(Replica replica);
    /**
     * Return the date for last check of missing files for replica. Guaranteed
     * not to recheck the archive, simply returns the date for the
     * last test.
     *
     * @param replica The replica to get date for changed files from.
     * @return The date for last check of missing files.
     */
    Date getDateForMissingFiles(Replica replica);
    /**
     * Return the date for last check of changed files for replica. Guaranteed
     * not to recheck the archive, simply returns the date for the
     * last test.
     *
     * @param replica The replica to get date for changed files from.
     * @return The date for last check of changed files.
     */
    Date getDateForChangedFiles(Replica replica);
    // Update files in bitarchives
    /**
     * Check that files are indeed missing on the given replica, and present
     * in admin data and reference replica. If so, upload missing files from
     * reference replica to this replica.
     *
     * @param replica The replica to restore files to
     * @param filenames The names of the files.
     */
    void uploadMissingFiles(Replica replica, String... filenames);
    /**
     * Check that the checksum of the file is indeed different to the value in
     * admin data and reference replica. If so, remove missing file and upload
     * it from reference replica to this replica.
     *
     * @param replica The replica to restore file to
     * @param filename The name of the file
     * @param credentials The credentials used to perform this replace operation
     * @param checksum  The known bad checksum. Only a file with this bad
     * checksum is attempted repaired.
     */
    void replaceChangedFile(Replica replica, String filename,
                            String credentials, String checksum);
    // Check state for admin data
    /**
     * Return a list of files represented in replica but missing in AdminData.
     *
     * @return A list of missing files.
     */
    Iterable<String> getMissingFilesForAdminData();
    /**
     * Return a list of files with wrong checksum or state in admin data.
     *
     * @return A list of files with wrong checksum or state.
     */
    Iterable<String> getChangedFilesForAdminData();
    // Update admin data
    /**
     * Add files unknown in admin.data to admin.data.
     *
     * @param filenames The files to add.
     */
    void addMissingFilesToAdminData(String... filenames);
    /**
     * Reestablish admin data to match bitarchive states for file.
     *
     * @param filename The file to reestablish state for.
     */
    void changeStateForAdminData(String filename);
}

The above code may differ from the latest version in our repository: https://gforge.statsbiblioteket.dk/plugins/scmsvn/viewcvs.php/*checkout*/trunk/src/dk/netarkivet/archive/arcrepository/bitpreservation/ActiveBitPreservation.java?root=netarchivesuite (requires access to our code repository).

System Design 3.16/Pluggable parts (last edited 2010-11-24 12:51:20 by SoerenCarlsen)