• Skip to content
  • Skip to link menu
KDE 4.5 API Reference
  • KDE API Reference
  • KDE-PIM Libraries
  • Sitemap
  • Contact Us
 

akonadi

itemsync.cpp

00001 /*
00002     Copyright (c) 2007 Tobias Koenig <tokoe@kde.org>
00003     Copyright (c) 2007 Volker Krause <vkrause@kde.org>
00004 
00005     This library is free software; you can redistribute it and/or modify it
00006     under the terms of the GNU Library General Public License as published by
00007     the Free Software Foundation; either version 2 of the License, or (at your
00008     option) any later version.
00009 
00010     This library is distributed in the hope that it will be useful, but WITHOUT
00011     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00012     FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
00013     License for more details.
00014 
00015     You should have received a copy of the GNU Library General Public License
00016     along with this library; see the file COPYING.LIB.  If not, write to the
00017     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
00018     02110-1301, USA.
00019 */
00020 
00021 #include "itemsync.h"
00022 
00023 #include "collection.h"
00024 #include "item.h"
00025 #include "itemcreatejob.h"
00026 #include "itemdeletejob.h"
00027 #include "itemfetchjob.h"
00028 #include "itemmodifyjob.h"
00029 #include "transactionsequence.h"
00030 #include "itemfetchscope.h"
00031 
00032 #include <kdebug.h>
00033 
00034 #include <QtCore/QStringList>
00035 
00036 using namespace Akonadi;
00037 
00041 class ItemSync::Private
00042 {
00043   public:
00044     Private( ItemSync *parent ) :
00045       q( parent ),
00046       mTransactionMode( Single ),
00047       mCurrentTransaction( 0 ),
00048       mTransactionJobs( 0 ),
00049       mPendingJobs( 0 ),
00050       mProgress( 0 ),
00051       mTotalItems( -1 ),
00052       mTotalItemsProcessed( 0 ),
00053       mStreaming( false ),
00054       mIncremental( false ),
00055       mLocalListDone( false ),
00056       mDeliveryDone( false ),
00057       mFinished( false )
00058     {
00059       // we want to fetch all data by default
00060       mFetchScope.fetchFullPayload();
00061       mFetchScope.fetchAllAttributes();
00062     }
00063 
00064     void createLocalItem( const Item &item );
00065     void checkDone();
00066     void slotLocalListDone( KJob* );
00067     void slotLocalDeleteDone( KJob* );
00068     void slotLocalChangeDone( KJob* );
00069     void execute();
00070     void processItems();
00071     void deleteItems( const Item::List &items );
00072     void slotTransactionResult( KJob *job );
00073     Job* subjobParent() const;
00074 
00075     ItemSync *q;
00076     Collection mSyncCollection;
00077     QHash<Item::Id, Akonadi::Item> mLocalItemsById;
00078     QHash<QString, Akonadi::Item> mLocalItemsByRemoteId;
00079     QSet<Akonadi::Item> mUnprocessedLocalItems;
00080 
00081     // transaction mode, TODO: make this public API?
00082     enum TransactionMode {
00083       Single,
00084       Chunkwise,
00085       None
00086     };
00087     TransactionMode mTransactionMode;
00088     TransactionSequence *mCurrentTransaction;
00089     int mTransactionJobs;
00090 
00091     // fetch scope for initial item listing
00092     ItemFetchScope mFetchScope;
00093 
00094     // remote items
00095     Akonadi::Item::List mRemoteItems;
00096 
00097     // removed remote items
00098     Item::List mRemovedRemoteItems;
00099 
00100     // create counter
00101     int mPendingJobs;
00102     int mProgress;
00103     int mTotalItems;
00104     int mTotalItemsProcessed;
00105 
00106     bool mStreaming;
00107     bool mIncremental;
00108     bool mLocalListDone;
00109     bool mDeliveryDone;
00110     bool mFinished;
00111 };
00112 
00113 void ItemSync::Private::createLocalItem( const Item & item )
00114 {
00115   // don't try to do anything in error state
00116   if ( q->error() )
00117     return;
00118   mPendingJobs++;
00119   ItemCreateJob *create = new ItemCreateJob( item, mSyncCollection, subjobParent() );
00120   q->connect( create, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00121 }
00122 
00123 void ItemSync::Private::checkDone()
00124 {
00125   q->setProcessedAmount( KJob::Bytes, mProgress );
00126   if ( mPendingJobs > 0 || !mDeliveryDone || mTransactionJobs > 0 )
00127     return;
00128 
00129   if ( !mFinished ) { // prevent double result emission, can happen since checkDone() is called from all over the place
00130     mFinished = true;
00131     q->emitResult();
00132   }
00133 }
00134 
00135 ItemSync::ItemSync( const Collection &collection, QObject *parent ) :
00136     Job( parent ),
00137     d( new Private( this ) )
00138 {
00139   d->mSyncCollection = collection;
00140 }
00141 
00142 ItemSync::~ItemSync()
00143 {
00144   delete d;
00145 }
00146 
00147 void ItemSync::setFullSyncItems( const Item::List &items )
00148 {
00149   Q_ASSERT( !d->mIncremental );
00150   if ( !d->mStreaming )
00151     d->mDeliveryDone = true;
00152   d->mRemoteItems += items;
00153   d->mTotalItemsProcessed += items.count();
00154   kDebug() << "Received: " << items.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems;
00155   setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00156   if ( d->mTotalItemsProcessed == d->mTotalItems )
00157     d->mDeliveryDone = true;
00158   d->execute();
00159 }
00160 
00161 void ItemSync::setTotalItems( int amount )
00162 {
00163   Q_ASSERT( !d->mIncremental );
00164   Q_ASSERT( amount >= 0 );
00165   setStreamingEnabled( true );
00166   kDebug() << amount;
00167   d->mTotalItems = amount;
00168   setTotalAmount( KJob::Bytes, amount );
00169   if ( d->mTotalItems == 0 ) {
00170     d->mDeliveryDone = true;
00171     d->execute();
00172   }
00173 }
00174 
00175 void ItemSync::setIncrementalSyncItems( const Item::List &changedItems, const Item::List &removedItems )
00176 {
00177   d->mIncremental = true;
00178   if ( !d->mStreaming )
00179     d->mDeliveryDone = true;
00180   d->mRemoteItems += changedItems;
00181   d->mRemovedRemoteItems += removedItems;
00182   d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
00183   setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00184   if ( d->mTotalItemsProcessed == d->mTotalItems )
00185     d->mDeliveryDone = true;
00186   d->execute();
00187 }
00188 
00189 void ItemSync::setFetchScope( ItemFetchScope &fetchScope )
00190 {
00191   d->mFetchScope = fetchScope;
00192 }
00193 
00194 ItemFetchScope &ItemSync::fetchScope()
00195 {
00196   return d->mFetchScope;
00197 }
00198 
00199 void ItemSync::doStart()
00200 {
00201   ItemFetchJob* job = new ItemFetchJob( d->mSyncCollection, this );
00202   job->setFetchScope( d->mFetchScope );
00203 
00204   // we only can fetch parts already in the cache, otherwise this will deadlock
00205   job->fetchScope().setCacheOnly( true );
00206 
00207   connect( job, SIGNAL( result( KJob* ) ), SLOT( slotLocalListDone( KJob* ) ) );
00208 }
00209 
00210 bool ItemSync::updateItem( const Item &storedItem, Item &newItem )
00211 {
00212   // we are in error state, better not change anything at all anymore
00213   if ( error() )
00214     return false;
00215 
00216   /*
00217    * We know that this item has changed (as it is part of the
00218    * incremental changed list), so we just put it into the
00219    * storage.
00220    */
00221   if ( d->mIncremental )
00222     return true;
00223 
00224   // Check whether the flags differ
00225   if ( storedItem.flags() != newItem.flags() ) {
00226     kDebug() << "Stored flags "  << storedItem.flags()
00227              << "new flags " << newItem.flags();
00228     return true;
00229   }
00230 
00231   // Check whether the new item contains unknown parts
00232   QSet<QByteArray> missingParts = newItem.loadedPayloadParts();
00233   missingParts.subtract( storedItem.loadedPayloadParts() );
00234   if ( !missingParts.isEmpty() )
00235     return true;
00236 
00237   // ### FIXME SLOW!!!
00238   // If the available part identifiers don't differ, check
00239   // whether the content of the payload differs
00240   if ( newItem.hasPayload()
00241     && storedItem.payloadData() != newItem.payloadData() )
00242     return true;
00243 
00244   // check if remote attributes have been changed
00245   foreach ( Attribute* attr, newItem.attributes() ) {
00246     if ( !storedItem.hasAttribute( attr->type() ) )
00247       return true;
00248     if ( attr->serialized() != storedItem.attribute( attr->type() )->serialized() )
00249       return true;
00250   }
00251 
00252   return false;
00253 }
00254 
00255 void ItemSync::Private::slotLocalListDone( KJob * job )
00256 {
00257   if ( !job->error() ) {
00258     const Item::List list = static_cast<ItemFetchJob*>( job )->items();
00259     foreach ( const Item &item, list ) {
00260       if ( item.remoteId().isEmpty() )
00261         continue;
00262       mLocalItemsById.insert( item.id(), item );
00263       mLocalItemsByRemoteId.insert( item.remoteId(), item );
00264       mUnprocessedLocalItems.insert( item );
00265     }
00266   }
00267 
00268   mLocalListDone = true;
00269   execute();
00270 }
00271 
00272 void ItemSync::Private::execute()
00273 {
00274   if ( !mLocalListDone )
00275     return;
00276 
00277   if ( (mTransactionMode == Single && !mCurrentTransaction) || mTransactionMode == Chunkwise ) {
00278     ++mTransactionJobs;
00279     mCurrentTransaction = new TransactionSequence( q );
00280     mCurrentTransaction->setAutomaticCommittingEnabled( false );
00281     connect( mCurrentTransaction, SIGNAL( result( KJob* ) ), q, SLOT( slotTransactionResult( KJob* ) ) );
00282   }
00283 
00284   processItems();
00285   if ( !mDeliveryDone ) {
00286     if ( mTransactionMode == Chunkwise && mCurrentTransaction ) {
00287       mCurrentTransaction->commit();
00288       mCurrentTransaction = 0;
00289     }
00290     return;
00291   }
00292 
00293   // removed
00294   if ( !mIncremental ) {
00295     mRemovedRemoteItems = mUnprocessedLocalItems.toList();
00296     mUnprocessedLocalItems.clear();
00297   }
00298 
00299   deleteItems( mRemovedRemoteItems );
00300   mLocalItemsById.clear();
00301   mLocalItemsByRemoteId.clear();
00302   mRemovedRemoteItems.clear();
00303 
00304   if ( mCurrentTransaction ) {
00305     mCurrentTransaction->commit();
00306     mCurrentTransaction = 0;
00307   }
00308 
00309   checkDone();
00310 }
00311 
00312 void ItemSync::Private::processItems()
00313 {
00314   // added / updated
00315   foreach ( Item remoteItem, mRemoteItems ) { //krazy:exclude=foreach non-const is needed here
00316 #ifndef NDEBUG
00317     if ( remoteItem.remoteId().isEmpty() ) {
00318       kWarning() << "Item " << remoteItem.id() << " does not have a remote identifier";
00319     }
00320 #endif
00321 
00322     Item localItem = mLocalItemsById.value( remoteItem.id() );
00323     if ( !localItem.isValid() )
00324       localItem = mLocalItemsByRemoteId.value( remoteItem.remoteId() );
00325     mUnprocessedLocalItems.remove( localItem );
00326     // missing locally
00327     if ( !localItem.isValid() ) {
00328       createLocalItem( remoteItem );
00329       continue;
00330     }
00331 
00332     if ( q->updateItem( localItem, remoteItem ) ) {
00333       mPendingJobs++;
00334 
00335       remoteItem.setId( localItem.id() );
00336       remoteItem.setRevision( localItem.revision() );
00337       remoteItem.setSize( localItem.size() );
00338       remoteItem.setRemoteId( localItem.remoteId() );  // in case someone clears remoteId by accident
00339       ItemModifyJob *mod = new ItemModifyJob( remoteItem, subjobParent() );
00340       q->connect( mod, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00341     } else {
00342       mProgress++;
00343     }
00344   }
00345   mRemoteItems.clear();
00346 }
00347 
00348 void ItemSync::Private::deleteItems( const Item::List &items )
00349 {
00350   // if in error state, better not change anything anymore
00351   if ( q->error() )
00352     return;
00353 
00354   Item::List itemsToDelete;
00355   foreach ( const Item &item, items ) {
00356     Item delItem( item );
00357     if ( !item.isValid() ) {
00358       delItem = mLocalItemsByRemoteId.value( item.remoteId() );
00359     }
00360 
00361     if ( !delItem.isValid() ) {
00362 #ifndef NDEBUG
00363       kWarning() << "Delete item (remoteeId=" << delItem.remoteId()
00364                  << "mimeType=" << delItem.mimeType()
00365                  << ") does not have a valid UID and no item with that remote ID exists either";
00366 #endif
00367       continue;
00368     }
00369 
00370     if ( delItem.remoteId().isEmpty() ) {
00371       // don't attempt to remove items that never were written to the backend
00372       continue;
00373     }
00374 
00375     itemsToDelete.append ( delItem );
00376   }
00377 
00378   if ( !itemsToDelete.isEmpty() ) {
00379     mPendingJobs++;
00380     ItemDeleteJob *job = new ItemDeleteJob( itemsToDelete, subjobParent() );
00381     q->connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalDeleteDone( KJob* ) ) );
00382 
00383     // It can happen that the groupware servers report us deleted items
00384     // twice, in this case this item delete job will fail on the second try.
00385     // To avoid a rollback of the complete transaction we gracefully allow the job
00386     // to fail :)
00387     TransactionSequence *transaction = qobject_cast<TransactionSequence*>( subjobParent() );
00388     if ( transaction )
00389       transaction->setIgnoreJobFailure( job );
00390   }
00391 }
00392 
00393 void ItemSync::Private::slotLocalDeleteDone( KJob* )
00394 {
00395   mPendingJobs--;
00396   mProgress++;
00397 
00398   checkDone();
00399 }
00400 
00401 void ItemSync::Private::slotLocalChangeDone( KJob * job )
00402 {
00403   Q_UNUSED( job );
00404   mPendingJobs--;
00405   mProgress++;
00406 
00407   checkDone();
00408 }
00409 
00410 void ItemSync::Private::slotTransactionResult( KJob *job )
00411 {
00412   --mTransactionJobs;
00413   if ( mCurrentTransaction == job )
00414     mCurrentTransaction = 0;
00415 
00416   checkDone();
00417 }
00418 
00419 Job * ItemSync::Private::subjobParent() const
00420 {
00421   if ( mCurrentTransaction && mTransactionMode != None )
00422     return mCurrentTransaction;
00423   return q;
00424 }
00425 
00426 void ItemSync::setStreamingEnabled(bool enable)
00427 {
00428   d->mStreaming = enable;
00429 }
00430 
00431 void ItemSync::deliveryDone()
00432 {
00433   Q_ASSERT( d->mStreaming );
00434   d->mDeliveryDone = true;
00435   d->execute();
00436 }
00437 
00438 void ItemSync::slotResult(KJob* job)
00439 {
00440   if ( job->error() ) {
00441     // pretent there were no errors
00442     Akonadi::Job::removeSubjob( job );
00443     // propagate the first error we got but continue, we might still be fed with stuff from a resource
00444     if ( !error() ) {
00445       setError( job->error() );
00446       setErrorText( job->errorText() );
00447     }
00448   } else {
00449     Akonadi::Job::slotResult( job );
00450   }
00451 }
00452 
00453 void ItemSync::rollback()
00454 {
00455   setError( UserCanceled );
00456   if ( d->mCurrentTransaction )
00457     d->mCurrentTransaction->rollback();
00458   d->mDeliveryDone = true; // user wont deliver more data
00459   d->execute(); // end this in an ordered way, since we have an error set no real change will be done
00460 }
00461 
00462 
00463 #include "itemsync.moc"

akonadi

Skip menu "akonadi"
  • Main Page
  • Modules
  • Namespace List
  • Class Hierarchy
  • Alphabetical List
  • Class List
  • File List
  • Namespace Members
  • Class Members
  • Related Pages

KDE-PIM Libraries

Skip menu "KDE-PIM Libraries"
  • akonadi
  •   contact
  •   kmime
  • kabc
  • kblog
  • kcal
  • kholidays
  • kimap
  • kioslave
  •   imap4
  •   mbox
  •   nntp
  • kldap
  • kmime
  • kontactinterface
  • kpimidentities
  • kpimtextedit
  •   richtextbuilders
  • kpimutils
  • kresources
  • ktnef
  • kxmlrpcclient
  • mailtransport
  • microblog
  • qgpgme
  • syndication
  •   atom
  •   rdf
  •   rss2
Generated for KDE-PIM Libraries by doxygen 1.7.1
This website is maintained by Adriaan de Groot and Allen Winter.
KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal