Incremental data backups on Google App Engine using Fantasm and datastore namespaces.

Posted on by

Data reliability is essential for any application. Applications running on Google App Engine have well-protected data out-of-the-box: Google provides redundant disks and data replication between data centres. One can rest quite assured that your data is safe from hardware failure.

However, a final risk remains: you. If you, or your team, develop a script with a bug, it can wreak havoc on your own data. Given tools and techniques like continuations and Map-Reduce, you can accidentally do some wide-spread damage to your own data. Of course, things should be well tested, but there is always the possibility that something unexpected occurs. It remains vital to have a data backup, even in a safe environment like App Engine.

General approach to an incremental backup.

First, a word on my mindset: I’m of the opinion that Google protects my data from hardware failure and hackers, that is, they have considerably more resources on this front than I will ever have. As such, I really only need to keep snapshots of my data around in case my own code goes crazy so that I can recover. Since Google provides great performance and hardware redundancy, it makes an ideal home for my backup storage. So the balance of this article will focus on making an incremental backup of your data in the same App Engine application.

Fantasm, developed by VendAsta Technologies,  is an open-source finite state machine engine for building taskqueue-based workflows on Google App Engine. In particular, it allows you to write resilient workflows with automatic retry mechanisms without worrying about the details of the highly flexible Task Queue API. Most importantly, Fantasm’s continuation feature makes it a piece of cake to walk across very large datasets – perfect for constructing a snapshot of your data. If you’re new to Fantasm, please start with this article.

Implementation.

To begin, we’ll construct a finite state machine for our backup in fsm.yaml:

- name: Backup

 context_types:
   lastBackup: datetime

 states:

 - name: SelectBackup
   initial: True
   action: SelectBackup
   transitions:
   - event: ok
     to: EnumerateBackupModels

 - name: EnumerateBackupModels
   continuation: True
   action: EnumerateBackupModels
   transitions:
   - event: ok
     to: BackupEntity

 - name: BackupEntity
   continuation: True
   action: BackupEntity
   final: True

There are three states to this machine with a simple linear transition between each. The graphic representation looks like the following:

In the first state, SelectBackup, we will do a bit of date math to determine which backup we are working with.

FULL_BACKUP_ON_DAY_OF_WEEK = 0 # 0 = Monday, 1 = Tuesday, ...

class SelectBackup(FSMAction):

   def execute(self, context, obj, utcnow=None):
       utcnow = utcnow or datetime.datetime.utcnow()
       fullBackupDay = utcnow - datetime.timedelta(days=utcnow.weekday()) 
                       + datetime.timedelta(days=FULL_BACKUP_ON_DAY_OF_WEEK)
       # above date might be in the future, if so, we need to back it up a week
       if fullBackupDay > utcnow:
           fullBackupDay = fullBackupDay - datetime.timedelta(days=7)
       context['backupId'] = 'backup-' + fullBackupDay.strftime('%Y-%m-%d')
       return 'ok'

The SelectBackup state selects the most recent past Monday, and places that date on the context as a string like backup-2011-03-21. The control then passes to the next state, EnumerateBackupModels.

The EnumerateBackupModels state is demarcated as a “continuation” state. In this context, you can think of this as a loop. However, because of the way that Fantasm and Task Queue work, each iteration of the loop executes in parallel. So, in EnumerateBackupModels, we’re going to loop through each of the models that we want to backup and get parallel processes running for each.

BACKUP_CONFIG = (
    # MODEL       MODIFIED DATE      BACKUP BATCH SIZE
   (Account,      'modifiedTime',      1),
   (Company,      'modifiedTime',     10),
)

# create some mappings to make the later code more readable
BACKUP_CLASS = dict( (m[0].__name__, m[0]) for m in BACKUP_CONFIG )
BACKUP_INCREMENTAL_PROPERTY = dict( (m[0].__name__, m[1]) for m in BACKUP_CONFIG )
BACKUP_BATCH_SIZE = dict( (m[0].__name__, m[2]) for m in BACKUP_CONFIG )
BACKUP_MODELS = sorted(BACKUP_CLASS.keys())

class _Backup(db.Model):
   backupId = db.StringProperty(required=True)
   model = db.StringProperty(required=True)
   lastBackup = db.DateTimeProperty()

class EnumerateBackupModels(FSMAction):

   def continuation(self, context, obj, token=None):
       if not token:
           obj['model'] = BACKUP_MODELS[0]
           return BACKUP_MODELS[1] if len(BACKUP_MODELS) > 1 else None
       else:
           # find next in list
           for i in range(0, len(BACKUP_MODELS)):
               if BACKUP_MODELS[i] == token:
                   obj['model'] = BACKUP_MODELS[i]
                   return BACKUP_MODELS[i+1] if i < len(BACKUP_MODELS)-1 else None
       return None # occurs if token passed in is not found in list - shouldn't happen

   def execute(self, context, obj):
       backupId = context['backupId']
       model = obj['model']

       def tx():
           keyName = '%s:%s' % (backupId, model)
           entry = _Backup.get_by_key_name(keyName)

           if not entry:
               entry = _Backup(key_name=keyName, backupId=backupId,
                               model=model, lastBackup=None)
           else:
               context['lastBackup'] = entry.lastBackup # get the lastBackup time

           entry.lastBackup = datetime.datetime.utcnow() # update to now
           entry.put()

       db.run_in_transaction(tx)
       context['model'] = model
       return 'ok'
  • BACKUP_CONFIG This is simply a list of models that we’re interested in backing up. We build some maps (BACKUP_CLASS, BACKUP_INCREMENTAL_PROPERTY, etc.) over this configuration to make the rest of the code more readable.
  • _Backup This is a model that will track the backup ID for each model that we’ve backed up. That is, we can refer here to see what we’ve backed up in the past.
  • continuation The continuation method is responsible simply to return a sequence of tokens, in this case, the strings from BACKUP_MODELS. The token returned from one invocation of continuation is passed as an argument to the next invocation of continuation; we are able to walk across the list in this way. Finally, our execute method needs to know which model we’re working on, so we store it on the obj, making it available to the execute method.
  • execute Using the backupId (from SelectBackup state) and the model (from the continuation method), we ensure that we have an entity created in the _Backup tracking model. If a _Backup entity already existed for the given backupId, we add the time of the last backup to the context so that we can perform incremental backup. Next, we update the entity’s lastBackup to now since we’re working on a backup right now. Finally, we add the model to the context to make it available to subsequent machine states.

At this point, we have a machine in flight for each of the models in the BACKUP_MODELS list. All of these machines now move to the next (and final) state, BackupEntity. The BackupEntity state is also demarcated as a continuation state. Here, we are going to query for each of the entities for the model in question and copy the data to a backup entity. Google App Engine namespaces (as part of the Multitenancy feature) provide a very convenient mechanism to store backup entities. We can name a namespace for the backup ID and simply store the entity into that namespace. This has the further benefit of not polluting our view of the data in the App Engine console data viewer.

class BackupEntity(DatastoreContinuationFSMAction):

   def getQuery(self, context, obj):
       model = context['model']
       query = 'SELECT * FROM %s' % model

       lastBackup = context.get('lastBackup')
       if lastBackup and BACKUP_INCREMENTAL_PROPERTY[model]:
           query = query + ' WHERE %s >= :1' % BACKUP_INCREMENTAL_PROPERTY[model]
           return db.GqlQuery(query, lastBackup)
       else:
           return db.GqlQuery(query)

   def getBatchSize(self, context, obj):
       model = context['model']
       batchSize = BACKUP_BATCH_SIZE[model]
       return batchSize

   def execute(self, context, obj):
       if not obj.results:
           # query may return no results, terminate if so
           return None

       model = context['model']
       backupId = context['backupId']
       entities = obj['results']

       backupEntities = []
       for originalEntity in entities:

           # build a key with same path, but in different namespace
           originalKeyPath = originalEntity.key().to_path()
           newKey = db.Key.from_path(*originalKeyPath, **{'namespace': backupId})

           # copy over the property values
           kwargs = {}
           for prop in originalEntity.properties().values():
               if isinstance(prop, (db.ReferenceProperty,
                                    blobstore.BlobReferenceProperty)):
                   # avoid the dereference/auto-lookup
                   datastoreValue = prop.get_value_for_datastore(originalEntity)
               else:
                   datastoreValue = getattr(originalEntity, prop.name, None)
               kwargs[prop.name] = datastoreValue

           backupModelClass = BACKUP_CLASS[model]
           backupEntity = backupModelClass(key=newKey, **kwargs)
           backupEntities.append(backupEntity)

       db.put(backupEntities)
  • getQuery Since BackupEntity inherits from DatastoreContinuationFSMAction, we don’t need to implement continuation, we only need to implement getQuery. getQuery uses the model on the context to construct a simple query to fetch all the entities for that model. If our configuration states that the current model has a modified timestamp (e.g., a db.DateTimeProperty(auto_now=True)), we can extend the query to only consider entities that have been updated since the last backup time, i.e., an incremental backup. The parent class handles the query cursor and fetching entities and spins up a parallel execute method for each fetch batch.
  • getBatchSize Simply tells the parent class how many entities to fetch at a time. The number depends on the typical size of an entity for the current model; we need to ensure that a batch can fit into a protocol buffer size limitation because we put() them as a batch in the execute method.
  • execute The execute method gets a list of entities on obj['results'] (via the DatastoreContinuationFSMAction parent class). Looping through these entities, it creates a backup entity holding the data from the original. Most importantly, it uses a slightly different key to place the backup entity on a different namespace, named for the backupId. If an entity with an identical key already exists, the put() will overwrite that existing entity with the new backup entity.

We now have a finite state machine that will backup our entities as often as we invoke the machine. The heuristic of the machine will roll to a new snapshot each week, and perform incremental backup for machine invocations between the week boundaries. We can simply set up a schedule in cron.yaml to kick off our backups:

cron:
- description: Backup
 url: /fantasm/fsm/Backup/?method=POST
 schedule: every day 01:00

Building a machine for scrubbing old backups.

Constructing a finite state machine to scrub old backups is straightforward and yields a machine that is very similar to Backup:

- name: DeleteBackup

 context_types:
   daysOld: int
   backupDate: datetime

 states:

 - name: ComputeDate
   action: ComputeDate
   initial: True
   transitions:
   - event: ok
     to: SelectBackupToDelete

 - name: SelectBackupToDelete
   action: SelectBackupToDelete
   continuation: True
   final: True
   transitions:
   - event: ok
     to: DeleteBackupEntity

 - name: DeleteBackupEntity
   action: DeleteBackupEntity
   final: True
   continuation: True

Fantasm allows arguments to be passed in to machines, as standard GET or POST arguments. In the ComputeDate state, we convert a daysOld parameter into an absolute datetime and add it to the context as backupDate. Note that the context_types definition in the above machine configuration allows context['backupDate'] to be automatically cast to the correct data type in subsequent states.

class ComputeDate(object):

   def execute(self, context, obj):
       daysOld = context['daysOld'] # automatically cast as an int
       context['backupDate'] = datetime.datetime.utcnow() - 
                               datetime.timedelta(days=daysOld)
       return 'ok'

The next state, SelectBackupToDelete, queries our _Backup model to find any backups that are older than backupDate.

class SelectBackupToDelete(DatastoreContinuationFSMAction):

   def getQuery(self, context, obj):
       return _Backup.all().filter('lastBackup <', context['backupDate'])

   def execute(self, context, obj):
       if not obj['result']:
           # we may get no result back at all, so we can terminate
           return None
       backupEntity = obj['result']
       context['model'] = backupEntity.model
       context['backupId'] = backupEntity.backupId
       db.delete(backupEntity)
       return 'ok'
  • getQuery As a DatastoreContinuationFSMAction, we don’t need to implement the continuation method, we only need to implement getQuery. Here, we simply build a query of _Backup for backups that occurred before lastBackup date. Because the default batch size is 1, the execute method will be called (in parallel) for each _Backup entity retrieved.
  • execute Using the _Backup entity retrieved, which DatastoreContinuationFSMAction stores on obj['result'], we add the model and backupId to the context. We can delete the entity from _Backup and pass control to the next state which will delete the actual entities.

The next and final state, DeleteBackupEntity, is another continuation that walks across all the entities of the given model in the namespace named by backupId, and deletes them.

class DeleteBackupEntity(DatastoreContinuationFSMAction):

   def getQuery(self, context, obj):
       model = context['model']
       backupId = context['backupId']
       backupModelClass = BACKUP_CLASS[model]
       return backupModelClass.all(keys_only=True, namespace=backupId)

   def getBatchSize(self, context, obj):
       return 50

   def execute(self, context, obj):
       """ Actually delete the keys. """
       if obj['results']:
           db.delete(obj['results'])
  • getQuery Using the model and backupId from the previous state, we construct a query for all the entities, taking care to ensure the query is constrained to the right namespace. Also, since we’re just going to be deleting, we only need to retrieve the keys.
  • getBatchSize We can delete a number of entities at a time, e.g., 50.
  • execute The parent class DatastoreContinuationFSMAction places the results of the query on obj['results']; these results are a list of db.Key that we can pass directly to db.delete.

The only thing left is to invoke the DeleteBackup machine. We could use a cron.yaml entry to schedule it, but I’ll take the opportunity to highlight another Fantasm feature: spawn. spawn can be used to invoke other machines with given contexts. Calling spawn simply queues a task (actually a task for each context provided), so it can be called with little overhead. We can add the spawn call to the first state of our original Backup machine:

class SelectBackup(object):
   def execute(self, context, obj, utcnow=None):
       utcnow = utcnow or datetime.datetime.utcnow()
       fullBackupDay = utcnow - datetime.timedelta(days=utcnow.weekday()) 
                       + datetime.timedelta(days=FULL_BACKUP_ON_DAY_OF_WEEK)
       # above date might be in the future, if so, we need to back it up a week
       if fullBackupDay > utcnow:
           fullBackupDay = fullBackupDay - datetime.timedelta(days=7)
       context['backupId'] = 'backup-' + fullBackupDay.strftime('%Y-%m-%d')

       context.spawn('DeleteBackup',
                     [{'daysOld': 60}], # spawn multiple machines with multiple contexts
                     countdown=4*60*60)

       return 'ok'

Note that spawn also allows us to specify a countdown, which is the number of seconds in the future to start the machine. In this case, we are leaving some time to allow our Backup machine to complete before kicking off the DeleteBackup machine (though, practically speaking, they could run at the same time).

The full source for this example can be found in the Fantasm project at http://code.google.com/p/fantasm.