-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathazure.go
241 lines (192 loc) · 7.05 KB
/
azure.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
package queue
import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"time"
)
//AzureQueue is a queue backed by an Azure Service Bus Queue. Its methods are safe for use by multiple goroutines.
type AzureQueue struct {
namespace string
saKey string
saValue []byte
url string
client *http.Client
}
const azureQueueURL = "https://%s.servicebus.windows.net:443/%s/"
//NewAzureQueue creates a new queue from the given parameters. Their meaning can be found in the MSDN docs at:
// https://msdn.microsoft.com/en-us/library/azure/dn798895.aspx
func NewAzureQueue(namespace string, sharedAccessKeyName string, sharedAccessKeyValue string, queuePath string) *AzureQueue {
/*ss, err := base64.StdEncoding.DecodeString(string(sharedAccessKeyValue))
if err != nil {
panic(err)
}*/
return &AzureQueue{
namespace: namespace,
saKey: sharedAccessKeyName,
saValue: []byte(sharedAccessKeyValue),
url: fmt.Sprintf(azureQueueURL, namespace, queuePath),
client: &http.Client{},
}
}
func (aq *AzureQueue) request(url string, method string) (*http.Request, error) {
req, err := http.NewRequest(method, url, nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", aq.authHeader(url, aq.signatureExpiry(time.Now())))
return req, nil
}
func (aq *AzureQueue) requestWithBody(url string, method string, body []byte) (*http.Request, error) {
req, err := http.NewRequest(method, url, bytes.NewBuffer(body))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", aq.authHeader(url, aq.signatureExpiry(time.Now())))
return req, nil
}
//Succeed confirms that the request has been processed and permanently removes it from the queue.
//
//For more information see https://msdn.microsoft.com/en-us/library/azure/hh780768.aspx.
func (aq *AzureQueue) Succeed(item *Item) error {
req, err := aq.request(aq.url+"messages/"+item.ID+"/"+item.LockToken, "DELETE")
if err != nil {
return err
}
resp, err := aq.client.Do(req)
if err != nil {
return err
}
if resp.StatusCode == http.StatusOK {
return nil
}
defer resp.Body.Close()
b, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf("Got error code %v with body %s", resp.StatusCode, string(b))
}
//Send enqueues a new item.
//
//For more information see https://msdn.microsoft.com/en-us/library/azure/hh780737.aspx.
func (aq *AzureQueue) Send(item *Item) error {
req, err := aq.requestWithBody(aq.url+"messages/", "POST", item.Request)
if err != nil {
return err
}
resp, err := aq.client.Do(req)
if err != nil {
return err
}
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusCreated {
return nil
}
defer resp.Body.Close()
b, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf("Got error code %v with body %s", resp.StatusCode, string(b))
}
//Fail unlocks the request for processing by other workers. It should be used if the worker could not complete the request. In particular,
//because the lock token eventually expires, workers should not implement much (if any) retry logic as the token might expire before the
//retried code succeeds, leading to a request being processed multiple times.
//
//For more information see https://msdn.microsoft.com/en-us/library/azure/hh780737.aspx.
func (aq *AzureQueue) Fail(item *Item) error {
req, err := aq.request(aq.url+"messages/"+item.ID+"/"+item.LockToken, "PUT")
if err != nil {
return err
}
resp, err := aq.client.Do(req)
if err != nil {
return err
}
if resp.StatusCode == http.StatusOK {
return nil
}
defer resp.Body.Close()
b, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf("Got error code %v with body %s", resp.StatusCode, string(b))
}
//Next retrieves the next item from the queue. If there is none the first return parameter will be nil.
//
//For more information see https://msdn.microsoft.com/en-us/library/azure/hh780722.aspx.
func (aq *AzureQueue) Next() (*Item, error) {
req, err := aq.request(aq.url+"messages/head?timeout=60", "POST")
if err != nil {
return nil, err
}
resp, err := aq.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
brokerProperties := resp.Header.Get("BrokerProperties")
fmt.Println(brokerProperties) //ejh debug
var props map[string]interface{}
if err := json.Unmarshal([]byte(brokerProperties), &props); err != nil {
return nil, fmt.Errorf("Error unmarshalling BrokerProperties: %v", err)
}
var (
messageID string
lockToken string
ok bool
)
if messageID, ok = props["MessageId"].(string); !ok {
return nil, fmt.Errorf("BrokerProperties did not include MessageId or it was not a string")
}
if lockToken, ok = props["LockToken"].(string); !ok {
return nil, fmt.Errorf("BrokerProperties did not include LockToken or it was not a string")
}
message, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("Error reading message body")
}
return &Item{ID: messageID, LockToken: lockToken, Request: message}, nil
}
//signatureExpiry returns the expiry for the shared access signature for the next request.
//
//It's translated from the Python client:
// https://github.com/Azure/azure-sdk-for-python/blob/master/azure-servicebus/azure/servicebus/servicebusservice.py
func (aq *AzureQueue) signatureExpiry(from time.Time) string {
t := from.Add(300 * time.Second).Round(time.Second).Unix()
return strconv.Itoa(int(t))
}
//signatureURI returns the canonical URI according to Azure specs.
//
//It's translated from the Python client:
//https://github.com/Azure/azure-sdk-for-python/blob/master/azure-servicebus/azure/servicebus/servicebusservice.py
func (aq *AzureQueue) signatureURI(uri string) string {
return strings.ToLower(url.QueryEscape(uri)) //Python's urllib.quote and Go's url.QueryEscape behave differently. This might work, or it might not...like everything else to do with authentication in Azure.
}
//stringToSign returns the string to sign.
//
//It's translated from the Python client:
//https://github.com/Azure/azure-sdk-for-python/blob/master/azure-servicebus/azure/servicebus/servicebusservice.py
func (aq *AzureQueue) stringToSign(uri string, expiry string) string {
return uri + "\n" + expiry
}
//signString returns the HMAC signed string.
//
//It's translated from the Python client:
//https://github.com/Azure/azure-sdk-for-python/blob/master/azure-servicebus/azure/servicebus/_common_conversion.py
func (aq *AzureQueue) signString(s string) string {
h := hmac.New(sha256.New, aq.saValue)
h.Write([]byte(s))
encodedSig := base64.StdEncoding.EncodeToString(h.Sum(nil))
return url.QueryEscape(encodedSig)
}
//authHeader returns the value of the Authorization header for requests to Azure Service Bus.
//
//It's translated from the Python client:
//https://github.com/Azure/azure-sdk-for-python/blob/master/azure-servicebus/azure/servicebus/servicebusservice.py
func (aq *AzureQueue) authHeader(uri string, expiry string) string {
u := aq.signatureURI(uri)
s := aq.stringToSign(u, expiry)
sig := aq.signString(s)
return fmt.Sprintf("SharedAccessSignature sig=%s&se=%s&skn=%s&sr=%s", sig, expiry, aq.saKey, u)
}