001    /**
002     * 
003     * Copyright 2004 Hiram Chirino
004     * 
005     * Licensed under the Apache License, Version 2.0 (the "License"); 
006     * you may not use this file except in compliance with the License. 
007     * You may obtain a copy of the License at 
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, 
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
014     * See the License for the specific language governing permissions and 
015     * limitations under the License. 
016     * 
017     **/
018    package org.activemq.store.jdbc.adapter;
019    
020    import java.io.ByteArrayOutputStream;
021    import java.io.IOException;
022    import java.io.InputStream;
023    import java.io.OutputStream;
024    import java.sql.Blob;
025    import java.sql.Connection;
026    import java.sql.PreparedStatement;
027    import java.sql.ResultSet;
028    import java.sql.SQLException;
029    
030    import javax.jms.JMSException;
031    
032    import org.activemq.store.jdbc.StatementProvider;
033    
034    
035    /**
036     * This JDBCAdapter inserts and extracts BLOB data using the 
037     * getBlob()/setBlob() operations.  This is a little more involved
038     * since to insert a blob you have to:
039     * 
040     *  1: insert empty blob.
041     *  2: select the blob 
042     *  3: finally update the blob with data value. 
043     * 
044     * The databases/JDBC drivers that use this adapter are:
045     * <ul>
046     * <li></li> 
047     * </ul>
048     * 
049     * @version $Revision: 1.1 $
050     */
051    public class BlobJDBCAdapter extends DefaultJDBCAdapter {
052    
053        public BlobJDBCAdapter() {
054            super();
055        }
056    
057        public BlobJDBCAdapter(StatementProvider provider) {
058            super(provider);
059        }
060        
061        public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data) throws SQLException,
062                JMSException {
063            PreparedStatement s = null;
064            ResultSet rs = null;
065            try {
066                
067                // Add the Blob record.
068                s = c.prepareStatement(statementProvider.getAddMessageStatment());
069                s.setLong(1, seq);
070                s.setString(2, destinationName);
071                s.setString(3, messageID);
072                s.setString(4, " ");
073                
074                if (s.executeUpdate() != 1)
075                    throw new JMSException("Failed to broker message: " + messageID
076                            + " in container.");
077                s.close();
078    
079                // Select the blob record so that we can update it.
080                s = c.prepareStatement(statementProvider.getFindMessageStatment());
081                s.setLong(1, seq);
082                rs = s.executeQuery();
083                if (!rs.next())
084                    throw new JMSException("Failed to broker message: " + messageID
085                            + " in container.");
086    
087                // Update the blob
088                Blob blob = rs.getBlob(1);
089                OutputStream stream = blob.setBinaryStream(data.length);
090                stream.write(data);
091                stream.close();
092                s.close();
093    
094                // Update the row with the updated blob
095                s = c.prepareStatement(statementProvider.getUpdateMessageStatment());
096                s.setBlob(1, blob);
097                s.setLong(2, seq);
098    
099            } catch (IOException e) {
100                throw (SQLException) new SQLException("BLOB could not be updated: "
101                        + e).initCause(e);
102            } finally {
103                try {
104                    rs.close();
105                } catch (Throwable e) {
106                }
107                try {
108                    s.close();
109                } catch (Throwable e) {
110                }
111            }
112        }
113        
114        public byte[] doGetMessage(Connection c, long seq) throws SQLException {
115                PreparedStatement s=null; ResultSet rs=null;
116                try {
117                    
118                    s = c.prepareStatement(statementProvider.getFindMessageStatment());
119                    s.setLong(1, seq); 
120                    rs = s.executeQuery();
121                    
122                    if( !rs.next() )
123                        return null;
124                    Blob blob = rs.getBlob(1);
125                    InputStream is = blob.getBinaryStream();
126                    
127                    ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length());               
128                    int ch;
129                    while( (ch=is.read())>= 0 ) {
130                        os.write(ch);
131                    }
132                    is.close();
133                    os.close();
134                    
135                    return os.toByteArray();
136                    
137                } catch (IOException e) {
138                throw (SQLException) new SQLException("BLOB could not be updated: "
139                        + e).initCause(e);
140            } finally {
141                    try { rs.close(); } catch (Throwable e) {}
142                    try { s.close(); } catch (Throwable e) {}
143                }
144        }
145    
146    }