Tag Archives: ServiceBus

Windows Azure AppFabric Queues with Native C++

I’ve been playing around with writing native C++ lately. As a proof of concept I wrote a short sample that demonstrates how to access the Windows Azure AppFabric v2 Queues (topics coming soon!).

For a little context see the following:
Interview with AppFabric Team regarding Queues and Topics and
Service Bus Messaging Features

The sample below uses WinHTTP as a very basic REST client and performs the basic operations of creating a queue and writing messages to it. It could easily be extended to do some of the more advanced messaging functionality in the v2 release.

[sourcecode language="cpp"]
// QueueClient.h
#pragma once
#include <string>
#include "WinHTTPREST.h"
using namespace std;

struct MessageProperties
{
string Label;
string MessageBody;
};

class QueueClient
{
private:
WinHTTPREST * _rest;
string _serviceAddress;
DWORD _httpResponseCode;
string _httpResponse;
public:
QueueClient(string serviceNameSpace);
~QueueClient(void);
BOOL CreateQueue(string queueName, string authHeader);
BOOL DeleteItem(string itemName, string authHeader);
BOOL SubmitMessageToQueue(string queueName, string authHeader, string label, string messageBody);
string GetMessageFromQueue(string queueName, string authHeader);
string GetHttpResponse();
DWORD GetHTTPResponseCode();
};

[sourcecode language="cpp"]
// QueueClient.cpp
#include "StdAfx.h"
#include "QueueClient.h"
#include <vector>
#include <Windows.h>

QueueClient::QueueClient(string serviceNamespace)
{
_serviceAddress = serviceNamespace + ".servicebus.appfabriclabs.com";
_rest = new WinHTTPREST(_serviceAddress, TRUE);
}

QueueClient::~QueueClient(void)
{
if(_rest != NULL)
delete _rest;
}

BOOL QueueClient::CreateQueue(string queueName, string authHeader)
{
// Build the XML to describe the queue
// Other properties could be set here
// http://msdn.microsoft.com/en-us/library/gg278338.aspx#BKMK_REST5

string putData = "<entry xmlns="http://www.w3.org/2005/Atom"><title type="text">" + queueName + "</title>"
+ "<content type="application/xml">"
+ "<QueueDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect" />"
+ "</content></entry>";

vector<string> headers;
headers.push_back("Authorization: " + authHeader);
headers.push_back("Content-Type: application/atom+xml");
BOOL bRequest = _rest->SendRequest("PUT", queueName, _httpResponseCode, _httpResponse, headers, putData);

if(bRequest == FALSE || _httpResponseCode != 201)
return FALSE;

return TRUE;
}

// Reference for header/property mappings
// Additional properties could be set through adding headers
// http://msdn.microsoft.com/en-us/library/gg278338.aspx
BOOL QueueClient::SubmitMessageToQueue(string queueName, string authHeader,string label, string messageBody)
{
string sendAddress = queueName + "/Messages";
vector<string> headers;
headers.push_back("Authorization: " + authHeader);
headers.push_back("X-MS-LABEL: " + label);
BOOL bRequest = _rest->SendRequest("POST", sendAddress, _httpResponseCode, _httpResponse, headers, messageBody.c_str());
if(bRequest == FALSE || _httpResponseCode != 201)
return FALSE;
return TRUE;
}

// Read and remove from the queue
string QueueClient::GetMessageFromQueue(string queueName, string authHeader)
{
string receiveAddress = queueName + "/Messages/Head?timeout=30";
vector<string> headers;
headers.push_back("Authorization: " + authHeader);
_rest->SendRequest("DELETE", receiveAddress, _httpResponseCode, _httpResponse, headers, "");
return _httpResponse;
}

string QueueClient::GetHttpResponse()
{
return _httpResponse;
}

DWORD QueueClient::GetHTTPResponseCode()
{
return _httpResponseCode;
}

BOOL QueueClient::DeleteItem(string uri, string authHeader)
{
string putData = "";
vector<string> headers;
headers.push_back("Authorization: " + authHeader);
BOOL bRequest = _rest->SendRequest("DELETE", uri, _httpResponseCode, _httpResponse, headers, putData);
if(bRequest == FALSE || _httpResponseCode != 200)
return FALSE;
return TRUE;
}

Here is the WinHTTP REST wrapper
[sourcecode language="cpp"]
// WinHTTPRest.h
#pragma once
#include <string>
#include <Windows.h>
#include <winhttp.h>
#include <string>
#include <sstream>
#include <iomanip>
#include <vector>
using namespace std;

// Helper class for REST
class WinHTTPREST
{
private:
string _serverName;
string _httpProtocol;
HINTERNET _hSession;
HINTERNET _hConnect;
BOOL _secureSession;

std::wstring StringToWString(const std::string& s);
std::string WStringToString(const std::wstring& s);
int hex2dec( char hex );
public:
WinHTTPREST(string ServerName, BOOL SecureSession, string HTTPProtocol="HTTP/1.1");
~WinHTTPREST(void);
BOOL SendRequest(string Verb, string relativeUri, DWORD &dwStatusCode, string &httpResponse);
BOOL SendRequest(string Verb, string relativeUri,DWORD &dwStatusCode, string &httpResponse,string PostData);
BOOL SendRequest(string Verb, string relativeUri, DWORD &dwStatusCode, string &httpResponse, vector<string> headers, string PostData);
std::string UriEncode(const std::string& input);
std::string UriDecode(const std::string& sSrc);
};

[sourcecode language="cpp"]
// WinHTTPRest.cpp
#include "StdAfx.h"
#include "WinHTTPREST.h"
#include <vector>

WinHTTPREST::WinHTTPREST(string ServerName, BOOL SecureSession, string HTTPProtocol)
{
_serverName = ServerName;
_secureSession = SecureSession;
_httpProtocol = HTTPProtocol;
_hSession = WinHttpOpen(L"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)",
WINHTTP_ACCESS_TYPE_DEFAULT_PROXY,
WINHTTP_NO_PROXY_NAME,
WINHTTP_NO_PROXY_BYPASS, 0);

// Specify an HTTP server.
if( _hSession )
_hConnect = WinHttpConnect( _hSession,
StringToWString(_serverName).c_str(),
INTERNET_DEFAULT_HTTPS_PORT,
0 );
}

WinHTTPREST::~WinHTTPREST(void)
{
// Close open handles.
if( _hSession )
WinHttpCloseHandle( _hSession );
if( _hConnect )
WinHttpCloseHandle( _hConnect );
}

BOOL WinHTTPREST::SendRequest(string Verb, string relativeUri, DWORD &dwStatusCode, string &httpResponse)
{
vector<string> noheaders;
return SendRequest(Verb, relativeUri, dwStatusCode, httpResponse, noheaders, "");
}

BOOL WinHTTPREST::SendRequest(string Verb, string relativeUri,DWORD &dwStatusCode, string &httpResponse,string Data)
{
vector<string> noheaders;
return SendRequest(Verb, relativeUri, dwStatusCode, httpResponse, noheaders, Data);
}

BOOL WinHTTPREST::SendRequest(string Verb, string relativeUri, DWORD &dwStatusCode, string &httpResponse, vector<string> headers, string Data)
{
HINTERNET hRequest = NULL;

BOOL bResults = FALSE;
if(_hConnect)
hRequest = WinHttpOpenRequest(_hConnect, StringToWString(Verb).c_str(), StringToWString(relativeUri).c_str(), StringToWString(_httpProtocol.c_str()).c_str(), WINHTTP_NO_REFERER, WINHTTP_DEFAULT_ACCEPT_TYPES, WINHTTP_FLAG_SECURE );

if(hRequest)
{
if(headers.size() > 0)
{
for(std::vector<string>::size_type i = 0; i != headers.size(); i++)
{
WinHttpAddRequestHeaders( hRequest,
this->StringToWString(headers[i].c_str()).c_str(),
headers[i].length(),
WINHTTP_ADDREQ_FLAG_ADD );
}

}

if((Verb == "PUT" || Verb == "POST") && Data != "")
{
bResults = WinHttpSendRequest(hRequest, WINHTTP_NO_ADDITIONAL_HEADERS, 0, NULL, 0, Data.length(), NULL );

if(bResults)
{
DWORD dwBytesWritten = 0;
bResults = WinHttpWriteData( hRequest, Data.c_str(),
Data.length(),
&dwBytesWritten);

}
}
else if(Verb == "DELETE")
{
bResults = WinHttpSendRequest(hRequest, WINHTTP_NO_ADDITIONAL_HEADERS, 0, NULL, 0, 0, NULL );
}
else if(Verb == "GET")
{
bResults = WinHttpSendRequest(hRequest, WINHTTP_NO_ADDITIONAL_HEADERS, 0, WINHTTP_NO_REQUEST_DATA, 0, 0, NULL );
}

}

if( bResults )
bResults = WinHttpReceiveResponse( hRequest, NULL);

if(bResults)
{
DWORD dwSize = sizeof(DWORD);
bResults = WinHttpQueryHeaders( hRequest,
WINHTTP_QUERY_STATUS_CODE | WINHTTP_QUERY_FLAG_NUMBER,
NULL,
&dwStatusCode,
&dwSize,
WINHTTP_NO_HEADER_INDEX );

}

if(bResults)
{
LPSTR pszOutBuffer = NULL;
DWORD dwDownloaded = 0;
DWORD dwSize = 0;

// Keep checking for data until there is nothing left.
do
{
// Check for available data.
if (!WinHttpQueryDataAvailable( hRequest, &dwSize))
printf("Error %u.n", GetLastError());

// Allocate space for the buffer.
pszOutBuffer = (LPSTR) malloc (sizeof(LPSTR) * (dwSize+1));
if (!pszOutBuffer)
{
printf("Out of memoryn");
dwSize = 0;
}
else
{
// Read the Data.
ZeroMemory(pszOutBuffer, dwSize+1);

if (!WinHttpReadData( hRequest, (LPVOID)pszOutBuffer, dwSize, &dwDownloaded))
printf( "Error %u in WinHttpReadData.n", GetLastError());
else
{
httpResponse.append(pszOutBuffer);
}
// Free the memory allocated to the buffer.
free((void*) pszOutBuffer );
}

} while (dwSize > 0);
}

if( hRequest )
WinHttpCloseHandle( hRequest );

if(bResults == FALSE)
printf("Error %u.n", GetLastError());

return bResults;
}

std::wstring WinHTTPREST::StringToWString(const std::string& s)
{
std::wstring temp(s.length(),L' ');
std::copy(s.begin(), s.end(), temp.begin());
return temp;
}

std::string WinHTTPREST::WStringToString(const std::wstring& s)
{
std::string temp(s.length(), ' ');
std::copy(s.begin(), s.end(), temp.begin());
return temp;
}

Of course, what fun is C++ without a bunch of helper functions to "tweak" strings?

These examples were "borrowed" from http://www.codeguru.com/cpp/cpp/string/conversions/article.php/c12759.

[sourcecode language="CPP"]
// UrlEncodingHelpers.h
// Uri encode and decode.
// RFC1630, RFC1738, RFC2396

#include <string>
#include <assert.h>

const char HEX2DEC[256] =
{
/* 0 1 2 3 4 5 6 7 8 9 A B C D E F */
/* 0 */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
/* 1 */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
/* 2 */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
/* 3 */ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,-1,-1, -1,-1,-1,-1,

/* 4 */ -1,10,11,12, 13,14,15,-1, -1,-1,-1,-1, -1,-1,-1,-1,
/* 5 */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
/* 6 */ -1,10,11,12, 13,14,15,-1, -1,-1,-1,-1, -1,-1,-1,-1,
/* 7 */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,

/* 8 */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
/* 9 */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
/* A */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
/* B */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,

/* C */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
/* D */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
/* E */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
/* F */ -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1
};

std::string UriDecode(const std::string & sSrc)
{
// Note from RFC1630: "Sequences which start with a percent sign
// but are not followed by two hexadecimal characters (0-9, A-F) are reserved
// for future extension"

const unsigned char * pSrc = (const unsigned char *)sSrc.c_str();
const int SRC_LEN = sSrc.length();
const unsigned char * const SRC_END = pSrc + SRC_LEN;
const unsigned char * const SRC_LAST_DEC = SRC_END - 2; // last decodable '%'

char * const pStart = new char[SRC_LEN];
char * pEnd = pStart;

while (pSrc < SRC_LAST_DEC)
{
if (*pSrc == '%')
{
char dec1, dec2;
if (-1 != (dec1 = HEX2DEC[*(pSrc + 1)])
&& -1 != (dec2 = HEX2DEC[*(pSrc + 2)]))
{
*pEnd++ = (dec1 << 4) + dec2;
pSrc += 3;
continue;
}
}

*pEnd++ = *pSrc++;
}

// the last 2- chars
while (pSrc < SRC_END)
*pEnd++ = *pSrc++;

std::string sResult(pStart, pEnd);
delete [] pStart;
return sResult;
}

// Only alphanum is safe.
const char SAFE[256] =
{
/* 0 1 2 3 4 5 6 7 8 9 A B C D E F */
/* 0 */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,
/* 1 */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,
/* 2 */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,
/* 3 */ 1,1,1,1, 1,1,1,1, 1,1,0,0, 0,0,0,0,

/* 4 */ 0,1,1,1, 1,1,1,1, 1,1,1,1, 1,1,1,1,
/* 5 */ 1,1,1,1, 1,1,1,1, 1,1,1,0, 0,0,0,0,
/* 6 */ 0,1,1,1, 1,1,1,1, 1,1,1,1, 1,1,1,1,
/* 7 */ 1,1,1,1, 1,1,1,1, 1,1,1,0, 0,0,0,0,

/* 8 */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,
/* 9 */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,
/* A */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,
/* B */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,

/* C */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,
/* D */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,
/* E */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0,
/* F */ 0,0,0,0, 0,0,0,0, 0,0,0,0, 0,0,0,0
};

std::string UriEncode(const std::string & sSrc)
{
const char DEC2HEX[16 + 1] = "0123456789ABCDEF";
const unsigned char * pSrc = (const unsigned char *)sSrc.c_str();
const int SRC_LEN = sSrc.length();
unsigned char * const pStart = new unsigned char[SRC_LEN * 3];
unsigned char * pEnd = pStart;
const unsigned char * const SRC_END = pSrc + SRC_LEN;

for (; pSrc < SRC_END; ++pSrc)
{
if (SAFE[*pSrc])
*pEnd++ = *pSrc;
else
{
// escape this char
*pEnd++ = '%';
*pEnd++ = DEC2HEX[*pSrc >> 4];
*pEnd++ = DEC2HEX[*pSrc & 0x0F];
}
}

std::string sResult((char *)pStart, (char *)pEnd);
delete [] pStart;
return sResult;
}

void StringSplit(string str, string delim, vector<string> &results)
{
int cutAt;
while( (cutAt = str.find_first_of(delim)) != str.npos )
{
if(cutAt > 0)
{
results.push_back(str.substr(0,cutAt));
}
str = str.substr(cutAt+1);
}

if(str.length() > 0)
{
results.push_back(str);
}

}

The helper class to enable authenticating with ACS

[sourcecode language="cpp"]
#pragma once
#include "WinHTTPREST.h"
#include <Windows.h>
#include <string>
using namespace std;
// ACSAuthHelper.h
class ACSAuthHelper
{
string _serviceNamespace;
string _issuerName;
string _issuerPassword;
WinHTTPREST * _rest;
DWORD _httpResponseCode;
string _httpResponse;
public:
string GetHttpResponse();
DWORD GetHTTPResponseCode();
ACSAuthHelper(string serviceNamespace, string issuerName, string issuerPassword );
~ACSAuthHelper(void);
BOOL GetAuthHeader(int &secondsUntilExpiration, string &AuthHeader);
};

[sourcecode language="cpp"]
#include "StdAfx.h"
#include "ACSAuthHelper.h"
#include "UrlEncodingHelpers.h"

ACSAuthHelper::ACSAuthHelper(string serviceNamespace, string issuerName, string issuerPassword )
{
_serviceNamespace = serviceNamespace;
string serviceAddress = _serviceNamespace + ".servicebus.appfabriclabs.com";
_rest = new WinHTTPREST(serviceAddress, TRUE);
_issuerName = issuerName;
_issuerPassword = issuerPassword;
}

ACSAuthHelper::~ACSAuthHelper(void)
{
if(_rest != NULL)
delete _rest;
}

BOOL ACSAuthHelper::GetAuthHeader(int &secondsUntilExpiration, string &AuthHeader)
{
BOOL rValue = FALSE;
string relyingPartyAddress = "http://" + _serviceNamespace + ".servicebus.appfabriclabs.com";
string acsBaseAddress = _serviceNamespace + "-sb.accesscontrol.appfabriclabs.com";
string postData = "";

WinHTTPREST acsRest(acsBaseAddress, TRUE);

// Build the ACS Token Request
postData = "wrap_scope=" + UriEncode(relyingPartyAddress) + "&wrap_name=" + UriEncode(_issuerName) + "&wrap_password=" + UriEncode(_issuerPassword);

vector<string> headers;
// Add the content type header
headers.push_back("Content-type: application/x-www-form-urlencoded");

// Post the tokn request the WRAP uri
BOOL result = acsRest.SendRequest("POST", "WRAPv0.9", _httpResponseCode, _httpResponse, headers, postData);

// Parse the response. We want the authorization token and the expiration time
vector<string> tmpResponse;
StringSplit(_httpResponse, "&", tmpResponse);
string acsToken = "";
if(tmpResponse.size() > 1 && _httpResponseCode == 200)
{
acsToken = tmpResponse[0].replace(0, 18, ""); // replacing out wrap_access_token=
secondsUntilExpiration = atoi(tmpResponse[1].replace(0, 29, "").c_str()); // wrap_access_token_expires_in=
AuthHeader = "WRAP access_token="" + UriDecode(acsToken) + """;
rValue = TRUE;
}
else
{
rValue = FALSE;
}
return rValue;
}

string ACSAuthHelper::GetHttpResponse()
{
return _httpResponse;
}

DWORD ACSAuthHelper::GetHTTPResponseCode()
{
return _httpResponseCode;
}

Finally, the runtime of the program that makes it all work.

[sourcecode language="cpp"]
// main.cpp: Defines the entry point for the console application.
//

#include "stdafx.h"
#include <Windows.h>
#include <winhttp.h>
#include <string>
#include <sstream>
#include <iomanip>
#include <time.h>
#include "WinHTTPREST.h"
#include "ACSAuthHelper.h"
#include "QueueClient.h"
using namespace std;

int _tmain(int argc, _TCHAR* argv[])
{
int secondsUntilExpiration = 0;
string authHeader = "";

ACSAuthHelper acsAuth("yournamespace", "owner", "yourauthorizationcode");

if(acsAuth.GetAuthHeader(secondsUntilExpiration, authHeader) == FALSE)
{
printf("Authentication to ACS Failed.n");
printf("Status Code: %d, HTTP Response: %sn", acsAuth.GetHTTPResponseCode(), acsAuth.GetHttpResponse());
return 0;
}

QueueClient qc("yournamespace");
if(qc.CreateQueue("MyQueue", authHeader) == FALSE)
{
if(qc.GetHTTPResponseCode() != 409) // already exists
{
printf("Queue Creation Failed.n");
printf("Status Code: %d, HTTP Response: %sn", qc.GetHTTPResponseCode(), qc.GetHttpResponse());
return 0;
}
}

for(int i=0; i < 5; i++)
{
if(qc.SubmitMessageToQueue("MyQueue", authHeader, "Hello World Label!", "Hello World Message Body") == FALSE)
{
printf("Submitting to Queue Failed.n");
printf("Status Code: %d, HTTP Response: %sn", qc.GetHTTPResponseCode(), qc.GetHttpResponse());
}
}
return 0;
}

To receive the messages you can use the GetMessageFromQueue() method in C++. Or from managed code you can use MessageReceiver/BrokeredMessage classes.
One thing to remember is we are NOT passing managed strings around. So to get the underlying data you will want to read from the stream directly.

[sourcecode language="csharp"]
public void ReceiveMessage()
{
MessageReceiver receiver = _queueClient.CreateReceiver();

BrokeredMessage receivedMessage = null;

if (receiver.TryReceive(TimeSpan.FromSeconds(10), out receivedMessage))
{
receivedMessage.Complete();
System.IO.Stream s = receivedMessage.GetBody<System.IO.Stream>();
System.IO.StreamReader sr = new System.IO.StreamReader(s);
String msg = sr.ReadToEnd();
Console.WriteLine("Label: " + receivedMessage.Label + " Body: " + msg);
}
else
Console.WriteLine("No Messages To Process");
}