Monday, January 09, 2012

Managing Actions in Simple Storage Service with Hayloft

Hayloft provides a object oriented interface to the Amazon Web Services (AWS) Simple Storage Service (S3) in C++. When I started working on Hayloft, I had several goals. I'm wanted to learn something about cloud computing in general because I believe that's the direction everything is going and as someone else once said, "Amazon.com is the Coke of cloud computing, and there is no Pepsi". I wanted to learn something about S3 in particular because I have a background in mass storage systems and S3 might be the biggest one in the world. I wanted to write some software because I'm one of those people who can only learn by doing. I wanted to write it in C++ because I wanted to be able to use that software in small embedded systems and honking big distributed systems (turns out the skill sets are the same) where I've spent most of my career as a developer.

I didn't want to just slam out some code to talk to S3. I was interested in learning how to manage the individual operations, or what Hayloft refers to as an Action, with S3 using some of the patterns I've exploited in the past in the kinds of systems I've made a good living developing. I will show you some real, running code right from the Hayloft tarball (specifically in example.cpp) that will illustrate some of the different ways applications can use buckets and objects in S3 by integrating Hayloft. In S3 parlance, a bucket can be thought of as a web site with unique domain name, and an object is a flat file that a web browser can retrieve from that web site. Each of these examples will perform exactly the same sequence of S3 actions:

  • create BUCKET1 and BUCKET2 in S3;
  • put a local file unittest.txt into BUCKET1/OBJECT1 in S3;
  • copy BUCKET1/OBJECT1 to BUCKET2/OBJECT2 in S3;
  • get BUCKET2/OBJECT2 from S3 into a second temporary local file;
  • delete BUCKET1/OBJECT1 and BUCKET2/OBJECT2 in S3;
  • delete BUCKET1 and BUCKET2 in S3;
  • diff the first and second local files to verify they are the same.

Other than some surround to include some header files and set the some environmental variables so that Hayloft knows parameters like my access key ID and secret access key that Hayloft uses to authenticate my application to AWS, in the examples below I'm showing you all the running code that is in example.cpp.

The first example executes all actions synchronously. There is no need to start actions or wait for actions to complete. Each action is started in its C++ constructor, and the constructor blocks until that action completes. All processing is done in the calling thread.

const char * PATH = "unittest.txt";
Bucket BUCKET1("Synchronicity1");
Bucket BUCKET2("Synchronicity2");
Object OBJECT1("Synchronicity1.txt", BUCKET1);
Object OBJECT2("Synchronicity2.txt", BUCKET2);
PathInput input(PATH);
Octets length = size(PATH);
PathOutput output(OBJECT2.getKey());

BucketCreate bucketcreate1(BUCKET1);
BucketCreate bucketcreate2(BUCKET2);
if (!bucketcreate1.isSuccessful()) { break; }
if (!bucketcreate2.isSuccessful()) { break; }

ObjectPut objectput1(OBJECT1, input, length);
if (!objectput1.isSuccessful()) { break; }

ObjectCopy objectcopy(OBJECT1, OBJECT2);
if (!objectcopy.isSuccessful()) { break; }

ObjectGet objectget2(OBJECT2, output);
if (!objectget2.isSuccessful()) { break; }

ObjectDelete objectdelete1(OBJECT1);
ObjectDelete objectdelete2(OBJECT2);
if (!objectdelete1.isSuccessful()) { break; }
if (!objectdelete2.isSuccessful()) { break; }

BucketDelete bucketdelete1(BUCKET1);
BucketDelete bucketdelete2(BUCKET2);
if (!bucketdelete1.isSuccessful()) { break; }
if (!bucketdelete2.isSuccessful()) { break; }

std::string command = "diff ";
command += PATH;
command += " ";
command += OBJECT2.getKey();
if (std::system(command.c_str()) < 0) { break; }
if (::unlink(OBJECT2.getKey()) < 0) { break; }

There is no code here to recover from errors, although the extensive unit test suite for Hayloft, based on Google Test, includes examples of doing that using this paradigm. If you want to get started playing with S3 in C++ using Hayloft, this is the absolutely simplest way to do so. As with all the examples, local files are handled using Desperado input and output functors. The PathInput and PathOutput functors provide an abstract mechanism to read and write data from the local file system.

S3 actions can be managed by using a different C++ constructor that specifies a manager or what Hayloft generically refers to as a Plex. The simplest manager is Simplex, which is a bit of a cheat: it still uses the synchronous interface, but actions are not automatically started in the constructor. You must start each action explicitly, but when the start method returns, the action has completed. All actions are executed in the calling thread. I'll embolden the important differences between this and the prior example.

const char * PATH = "unittest.txt";
Bucket BUCKET1("Simplexity1");
Bucket BUCKET2("Simplexity2");
Object OBJECT1("Simplexity1.txt", BUCKET1);
Object OBJECT2("Simplexity2.txt", BUCKET2);
PathInput input(PATH);
Octets length = size(PATH);
PathOutput output(OBJECT2.getKey());
Simplex simplex;

BucketCreate bucketcreate1(BUCKET1, simplex);
BucketCreate bucketcreate2(BUCKET2, simplex);
bucketcreate1.start();
bucketcreate2.start();
if (!bucketcreate1.isSuccessful()) { break; }
if (!bucketcreate2.isSuccessful()) { break; }

ObjectPut objectput1(OBJECT1, simplex, input, length);
objectput1.start();
if (!objectput1.isSuccessful()) { break; }

ObjectCopy objectcopy(OBJECT1, OBJECT2, simplex);
objectcopy.start();
if (!objectcopy.isSuccessful()) { break; }

ObjectGet objectget2(OBJECT2, simplex, output);
objectget2.start();
if (!objectget2.isSuccessful()) { break; }

ObjectDelete objectdelete1(OBJECT1, simplex);
ObjectDelete objectdelete2(OBJECT2, simplex);
objectdelete1.start();
objectdelete2.start();
if (!objectdelete1.isSuccessful()) { break; }
if (!objectdelete2.isSuccessful()) { break; }

BucketDelete bucketdelete1(BUCKET1, simplex);
bucketdelete1.start();
BucketDelete bucketdelete2(BUCKET2, simplex);
bucketdelete2.start();
if (!bucketdelete1.isSuccessful()) { break; }
if (!bucketdelete2.isSuccessful()) { break; }

std::string command = "diff ";
command += PATH;
command += " ";
command += OBJECT2.getKey();
if (std::system(command.c_str()) < 0) { break; }
if (::unlink(OBJECT2.getKey()) < 0) { break; }

As before, there is no error recovery, although such recovery is important when using S3 or any other web or even internet based service. The most common errors will be failure to connect to the service. This can be due to connectivity problems at either end of the internet connection or indeed anywhere in between.

This next example uses a Multiplex to manage the actions. This will be the first example that uses the asynchronous interface. Because actions are executed asynchronously, multiple actions can use the same Multiplex and will be executed in parallel. Each action must be started, and the processing of each action must be explicitly driven. In this example, I'll use the Multiplex complete method that automatically drives any pending actions to completion. I'll show other approaches in subsequent examples.

const char * PATH = "unittest.txt";
Bucket BUCKET1("MultiplexityComplete1");
Bucket BUCKET2("MultiplexityComplete2");
Object OBJECT1("MultiplexityComplete1.txt", BUCKET1);
Object OBJECT2("MultiplexityComplete2.txt", BUCKET2);
PathInput input(PATH);
Octets length = size(PATH);
PathOutput output(OBJECT2.getKey());
Multiplex multiplex;

BucketCreate bucketcreate1(BUCKET1, multiplex);
BucketCreate bucketcreate2(BUCKET2, multiplex);
bucketcreate1.start();
bucketcreate2.start();
multiplex.complete();
if (!bucketcreate1.isSuccessful()) { break; }
if (!bucketcreate2.isSuccessful()) { break; }

ObjectPut objectput1(OBJECT1, multiplex, input, length);
objectput1.start();
multiplex.complete();
if (!objectput1.isSuccessful()) { break; }

ObjectCopy objectcopy(OBJECT1, OBJECT2, multiplex);
objectcopy.start();
multiplex.complete();
if (!objectcopy.isSuccessful()) { break; }

ObjectGet objectget2(OBJECT2, multiplex, output);
objectget2.start();
multiplex.complete();
if (!objectget2.isSuccessful()) { break; }

ObjectDelete objectdelete1(OBJECT1, multiplex);
ObjectDelete objectdelete2(OBJECT2, multiplex);
objectdelete1.start();
objectdelete2.start();
multiplex.complete();
if (!objectdelete1.isSuccessful()) { break; }
if (!objectdelete2.isSuccessful()) { break; }

BucketDelete bucketdelete1(BUCKET1, multiplex);
bucketdelete1.start();
BucketDelete bucketdelete2(BUCKET2, multiplex);
bucketdelete2.start();
multiplex.complete();
if (!bucketdelete1.isSuccessful()) { break; }
if (!bucketdelete2.isSuccessful()) { break; }

std::string command = "diff ";
command += PATH;
command += " ";
command += OBJECT2.getKey();
if (std::system(command.c_str()) < 0) { break; }
if (::unlink(OBJECT2.getKey()) < 0) { break; }

This approach is useful for testing, but in practice there isn't a huge advantage between it and the prior example other than the ability to run some actions in parallel, which this example does.

This next example uses Multiplex, but uses its service method to incrementally drive the underlying state machines to completion.

const char * PATH = "unittest.txt";
Bucket BUCKET1("MultiplexityService1");
Bucket BUCKET2("MultiplexityService2");
Object OBJECT1("MultiplexityService1.txt", BUCKET1);
Object OBJECT2("MultiplexityService2.txt", BUCKET2);
PathInput input(PATH);
Octets length = size(PATH);
PathOutput output(OBJECT2.getKey());
Multiplex multiplex;

BucketCreate bucketcreate1(BUCKET1, multiplex);
BucketCreate bucketcreate2(BUCKET2, multiplex);
bucketcreate1.start();
bucketcreate2.start();
while (multiplex.service() > 0) { }
if (!bucketcreate1.isSuccessful()) { break; }
if (!bucketcreate2.isSuccessful()) { break; }

ObjectPut objectput1(OBJECT1, multiplex, input, length);
objectput1.start();
while (multiplex.service() > 0) { }
if (!objectput1.isSuccessful()) { break; }

ObjectCopy objectcopy(OBJECT1, OBJECT2, multiplex);
objectcopy.start();
while (multiplex.service() > 0) { }
if (!objectcopy.isSuccessful()) { break; }

ObjectGet objectget2(OBJECT2, multiplex, output);
objectget2.start();
while (multiplex.service() > 0) { }
if (!objectget2.isSuccessful()) { break; }

ObjectDelete objectdelete1(OBJECT1, multiplex);
ObjectDelete objectdelete2(OBJECT2, multiplex);
objectdelete1.start();
objectdelete2.start();
while (multiplex.service() > 0) { }
if (!objectdelete1.isSuccessful()) { break; }
if (!objectdelete2.isSuccessful()) { break; }

BucketDelete bucketdelete1(BUCKET1, multiplex);
bucketdelete1.start();
BucketDelete bucketdelete2(BUCKET2, multiplex);
bucketdelete2.start();
while (multiplex.service() > 0) { }
if (!bucketdelete1.isSuccessful()) { break; }
if (!bucketdelete2.isSuccessful()) { break; }

std::string command = "diff ";
command += PATH;
command += " ";
command += OBJECT2.getKey();
if (std::system(command.c_str()) < 0) { break; }
if (::unlink(OBJECT2.getKey()) < 0) { break; }

I've just basically implemented the complete method by calling the service method iteratively until it reports that there is no more work to be done. But an application could instead time slice the calling of the service method with other work. The service method has embedded in it a timed select system call so that it relinquishes the processor while waiting for the sockets to S3 to become ready.

The next example is the first one in which you'll see multi-threading. The Complex manager has a service thread that runs in the background and executes all pending actions. Instead of starting actions itself, the application instead submits each action to Complex and then waits on a POSIX thread condition variable specific to that action. All action processing is done in the Complex thread. No matter how many Complex objects you create, there is just one Complex thread doing all the work, and many actions may be executed in parallel in the background by that one thread.

const char * PATH = "unittest.txt";
Bucket BUCKET1("ComplexityWait1");
Bucket BUCKET2("ComplexityWait2");
Object OBJECT1("ComplexityWait1.txt", BUCKET1);
Object OBJECT2("ComplexityWait2.txt", BUCKET2);
PathInput input(PATH);
Octets length = size(PATH);
PathOutput output(OBJECT2.getKey());
Complex complex;

BucketCreate bucketcreate1(BUCKET1, complex);
complex.start(bucketcreate1);
BucketCreate bucketcreate2(BUCKET2, complex);
complex.start(bucketcreate2);
complex.wait(bucketcreate1);
complex.wait(bucketcreate2);
if (!bucketcreate1.isSuccessful()) { break; }
if (!bucketcreate2.isSuccessful()) { break; }

ObjectPut objectput1(OBJECT1, complex, input, length);
complex.start(objectput1);
complex.wait(objectput1);
if (!objectput1.isSuccessful()) { break; }

ObjectCopy objectcopy(OBJECT1, OBJECT2, complex);
complex.start(objectcopy);
complex.wait(objectcopy);
if (!objectcopy.isSuccessful()) { break; }

ObjectGet objectget2(OBJECT2, complex, output);
complex.start(objectget2);
complex.wait(objectget2);
if (!objectget2.isSuccessful()) { break; }

ObjectDelete objectdelete1(OBJECT1, complex);
complex.start(objectdelete1);
ObjectDelete objectdelete2(OBJECT2, complex);
complex.start(objectdelete2);
complex.wait(objectdelete1);
complex.wait(objectdelete2);
if (!objectdelete1.isSuccessful()) { break; }
if (!objectdelete2.isSuccessful()) { break; }

BucketDelete bucketdelete1(BUCKET1, complex);
complex.start(bucketdelete1);
BucketDelete bucketdelete2(BUCKET2, complex);
complex.start(bucketdelete2);
complex.wait(bucketdelete1);
complex.wait(bucketdelete2);
if (!bucketdelete1.isSuccessful()) { break; }
if (!bucketdelete2.isSuccessful()) { break; }

std::string command = "diff ";
command += PATH;
command += " ";
command += OBJECT2.getKey();
if (std::system(command.c_str()) < 0) { break; }
if (::unlink(OBJECT2.getKey()) < 0) { break; }

Unless you have some strenuous objection to multi-threading, this is the approach you should use in a real application. The background service thread does a lot more than just execute the actions. If the actions fail for any reason deemed to be recoverable, like a network connection failure, the background service thread attempts to automatically retry the action. It does so after an appropriate back off interval to prevent the application from overloading the network, S3, or the local processor. The number of retries for any one action is limited so that execution of each action is guaranteed to eventually terminate. Once the application thread wakes up from its wait, the action will have either completed (either successfully or because of an unrecoverable error) or exceeded its retry limit.

One thing the background service thread cannot do by itself is know how to rewind the input or output functors if, for example, the network connection fails in the middle of the ObjectPut or ObjectGet. Hayloft has a way around that that involves overriding the reset method in ObjectPut or ObjectGet to handle the rewinding of the underlying data source or sink. In the case of PathInput and PathOutput, this just means closing and reopening the underlying local file. Although I don't show this example here, you can find it in example.cpp. It's trivial to implement in an application.

Hayloft currently doesn't offer a timed wait. I just haven't convinced myself it's either necessary or a good idea. But you can poll actions to see if the background service thread has completed them. In fact, if you don't mind burning a lot of processor cycles, you can dispense with the wait on the condition variable completely and just poll the actions until they complete.

const char * PATH = "unittest.txt";
Bucket BUCKET1("ComplexityPoll1");
Bucket BUCKET2("ComplexityPoll2");
Object OBJECT1("ComplexityPoll1.txt", BUCKET1);
Object OBJECT2("ComplexityPoll2.txt", BUCKET2);
PathInput input(PATH);
Octets length = size(PATH);
PathOutput output(OBJECT2.getKey());
Complex complex;

BucketCreate bucketcreate1(BUCKET1, complex);
complex.start(bucketcreate1);
BucketCreate bucketcreate2(BUCKET2, complex);
complex.start(bucketcreate2);
while (bucketcreate1 != true) { Thread::yield(); }
while (bucketcreate2 != true) { Thread::yield(); }
if (!bucketcreate1.isSuccessful()) { break; }
if (!bucketcreate2.isSuccessful()) { break; }

ObjectPut objectput1(OBJECT1, complex, input, length);
complex.start(objectput1);
while (objectput1 != true) { Thread::yield(); }
if (!objectput1.isSuccessful()) { break; }

ObjectCopy objectcopy(OBJECT1, OBJECT2, complex);
complex.start(objectcopy);
while (objectcopy != true) { Thread::yield(); }
if (!objectcopy.isSuccessful()) { break; }

ObjectGet objectget2(OBJECT2, complex, output);
complex.start(objectget2);
while (objectget2 != true) { Thread::yield(); }
if (!objectget2.isSuccessful()) { break; }

ObjectDelete objectdelete1(OBJECT1, complex);
complex.start(objectdelete1);
ObjectDelete objectdelete2(OBJECT2, complex);
complex.start(objectdelete2);
while (objectdelete1 != true) { Thread::yield(); }
while (objectdelete2 != true) { Thread::yield(); }
if (!objectdelete1.isSuccessful()) { break; }
if (!objectdelete2.isSuccessful()) { break; }

BucketDelete bucketdelete1(BUCKET1, complex);
complex.start(bucketdelete1);
BucketDelete bucketdelete2(BUCKET2, complex);
complex.start(bucketdelete2);
while (bucketdelete1 != true) { Thread::yield(); }
while (bucketdelete2 != true) { Thread::yield(); }
if (!bucketdelete1.isSuccessful()) { break; }
if (!bucketdelete2.isSuccessful()) { break; }

std::string command = "diff ";
command += PATH;
command += " ";
command += OBJECT2.getKey();
if (std::system(command.c_str()) < 0) { break; }
if (::unlink(OBJECT2.getKey()) < 0) { break; }

The polling queries a state variable inside every action that is protected by a mutex and is accessed using an overloaded C++ boolean cast operator.

It took me the better part of two solid months to slam out the code for Hayloft, including its extensive unit test suite. But once I did, writing these examples and getting them all running took just a morning, with occasional breaks to check out the latest LOLcats.

Even if you don't want to use Hayloft, or C++, there are many other libraries around for using S3 in other languages, or indeed for using other cloud-based services. I believe information technology is rapidly evolving into a model in which mobile wireless devices and their applications (driven by consumer-side economies of scale) act as front ends to very large distributed cloud based services (driven by information technology-side economies of scale). If you have a background in developing for embedded or distributed systems, I suspect that you are going to find your skills map well to those required for this kind of work.

No comments: