XRootD
Loading...
Searching...
No Matches
XrdNetPMarkFF.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d N e t P M a r k C f g . h h */
4/* */
5/* (c) 2021 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdint>
32#include <stdio.h>
33#include <stdlib.h>
34#include <string.h>
35#include <time.h>
36#include <unistd.h>
37#include <sys/socket.h>
38#include <sys/time.h>
39#include <sys/types.h>
40
41#include "Xrd/XrdScheduler.hh"
43#include "XrdNet/XrdNetMsg.hh"
45#include "XrdNet/XrdNetUtils.hh"
46#include "XrdSys/XrdSysError.hh"
47#include "XrdSys/XrdSysTrace.hh"
48
49/******************************************************************************/
50/* L o c a l M a c r o s */
51/******************************************************************************/
52
53#define TRACE(txt) if (doTrace) SYSTRACE(Trace->, tident, epName, 0, txt)
54
55#define DEBUG(txt) if (doDebug) SYSTRACE(Trace->, tident, epName, 0, txt)
56
57#define EPName(ep) const char *epName = ep
58
59/******************************************************************************/
60/* F i r e f l y P a c k e t T e m p l a t e */
61/******************************************************************************/
62
63namespace
64{
65const char *ffFmt0 =
66"<134>1 - %s xrootd - firefly-json - " //RFC5424 syslog header (abbreviated)
67"{"
68 "\"version\":1,"
69 "\"flow-lifecycle\":{"
70 "\"state\":\"%%s\"," //-> start | ongoing | end
71 "\"current-time\":\"%%s\"," //-> yyyy-mm-ddThh:mm:ss.uuuuuu+00:00
72 "\"start-time\":\"%s\""
73 "%%s" //-> ,"end-time":"<date-time>"
74 "},"
75 "\"usage\":{\"received\":%%llu,\"sent\":%%llu},"
76 "\"netlink\":{\"rtt\":%%u.%%.03u},";
77
78const char *ffFmt1 =
79 "\"context\":{"
80 "\"experiment-id\":%d,"
81 "\"activity-id\":%d"
82 "%s" //-> ,application:<appname>
83 "},";
84
85const char *ffFmt2 =
86 "\"flow-id\":{"
87 "\"afi\":\"ipv%c\"," //-> ipv4 | ipv6
88 "\"src-ip\":\"%s\"," // source client for put o/w server
89 "\"dst-ip\":\"%s\"," // dest server for put o/w client
90 "\"protocol\":\"tcp\","
91 "\"src-port\":%d,"
92 "\"dst-port\":%d"
93 "}"
94"}";
95
96const char *ffApp = ",\"application\":\"%.*s\"";
97
98const char *ffEnd = ",\"end-time\":\"%s\"";
99}
100
101/******************************************************************************/
102/* s t a t i c O b j e c t s */
103/******************************************************************************/
104
105namespace XrdNetPMarkConfig
106{
107
108// Other configuration values
109//
110extern XrdSysError *eDest;
111extern XrdNetMsg *netMsg;
112extern XrdNetMsg *netOrg;
113extern XrdScheduler *Sched;
114extern XrdSysTrace *Trace;
115
116extern char *ffDest;
117extern int ffPortO;
118extern int ffEcho;
119extern bool doDebug;
120extern bool doTrace;
121
122extern const char *myHostName;
123}
124using namespace XrdNetPMarkConfig;
125
126/******************************************************************************/
127/* T h r e a d I n t e r f a c e s */
128/******************************************************************************/
129/*
130namespace
131{
132void *Refresh(void *carg)
133 {int intvl = *(int *)carg;
134 while(true) {XrdSysTimer::Snooze(intvl); XrdNetPMarkCfg::Ping();}
135 }
136XrdSysMutex ffMutex;
137}
138*/
139
140/******************************************************************************/
141/* Private: E m i t */
142/******************************************************************************/
143
144bool XrdNetPMarkFF::Emit(const char *state, const char *cT, const char *eT)
145{
146 EPName("Emit");
147 struct sockStats ss;
148 char msgBuff[1024];
149
150 SockStats(ss);
151
152// Note that the supplier of the data is the source. Hence, for put requests
153// the client is designated as the source o/w it is the server. So, on a
154// put request the number of bytes we recived is the number of bytes the
155// source (i.e. client) sent. Note the temlate is "usage: recv sent".
156//
157 int n;
158 if (appName && !strcmp(appName, "http-put"))
159 {n = snprintf(msgBuff, sizeof(msgBuff), ffHdr, state, cT, eT,
160 ss.bSent, ss.bRecv, ss.msRTT, ss.usRTT);
161 } else {
162 n = snprintf(msgBuff, sizeof(msgBuff), ffHdr, state, cT, eT,
163 ss.bRecv, ss.bSent, ss.msRTT, ss.usRTT);
164 }
165
166 if (n + ffTailsz >= (int)sizeof(msgBuff))
167 {eDest->Emsg("PMarkFF", "invalid json; msgBuff truncated.");
168 fdOK = odOK = false;
169 return false;
170 }
171
172 memcpy(msgBuff+n, ffTail, ffTailsz+1);
173
174 if (fdOK)
175 {DEBUG("Sending pmark s-msg: " <<msgBuff);
176 if (netMsg->Send(msgBuff, n+ffTailsz) < 0)
177 {fdOK = false;
178 return false;
179 }
180 }
181
182 if (odOK)
183 {DEBUG("Sending pmark o-msg: " <<(netMsg ? "=s-msg" : msgBuff));
184 if (netOrg->Send(oDest, *mySad, msgBuff, n+ffTailsz) < 0)
185 {odOK = false;
186 return false;
187 }
188 }
189
190 return true;
191}
192
193/******************************************************************************/
194/* Private: g e t U T C */
195/******************************************************************************/
196
197const char *XrdNetPMarkFF::getUTC(char *utcBuff, int utcBLen)
198{
199 struct timeval tod;
200 struct tm utcDT;
201 char *bP;
202
203// Get the current time in UTC
204//
205 gettimeofday(&tod, 0);
206 gmtime_r(&tod.tv_sec, &utcDT);
207
208// Format this ISO-style
209//
210 size_t n = strftime(utcBuff, utcBLen, "%FT%T", &utcDT);
211 bP = utcBuff + n; utcBLen -= n;
212 snprintf(bP, utcBLen, ".%06u+00:00", static_cast<unsigned int>(tod.tv_usec));
213
214// Return result
215//
216 return utcBuff;
217}
218
219/******************************************************************************/
220/* P i n g */
221/******************************************************************************/
222/*
223void XrdNetPMarkCfg::Ping()
224{
225// Tell every registered task to send out a continuation
226//
227 ffMutex.Lock();
228 for (std::set<XdNetPMarkFF*> it = ffTasks.begin(); it!= ffTasks.end(); it++)
229???
230 ffMutex.UnLock();
231}
232*/
233/******************************************************************************/
234/* R e g i s t r y */
235/******************************************************************************/
236/*
237XrdNetMsg *XrdNetPMarkCfg::netMsg = 0;
238std::set<XrdNetPMarkFF*> XrdNetPMarkCfg::ffTasks;
239
240void XrdNetPMarkCfg::Registry(XrdNetPMarkFF *ffobj, bool doadd)
241{
242// Add or delete ityem from task list
243//
244 ffMutex.Lock();
245 if (doadd) ffTasks.insert(ffObj);
246 else ffTasks.erase(ffObj);
247 ffMutex.UnLock();
248}
249
250// This is firefly so we must get a netmsg object
251//
252 bool aOK;
253 netMsg = new XrdNetMsg(eLog, ffDest, aOK);
254 if (!aOK)
255 {eLog->Emsg("Config", "Unable to create UDP tunnel to", ffDest);
256 return 0;
257 }
258
259// If there is an interval, start a thread to handle continuations
260//
261 if (ffIntvl && XrdSysThread::Run(&tid,Refresh,(void *)&ffIntvl,0,"pmark")
262 {eDest->Emsg(epname, errno, "start pmark refresh timer");
263 return 0;
264 }
265*/
266
267/******************************************************************************/
268/* D e s t r u c t o r */
269/******************************************************************************/
270
272{
273// If all is well, emit the closing message
274//
275 if (fdOK || odOK)
276 {char utcBuff[40], endBuff[80];
277 snprintf(endBuff, sizeof(endBuff), ffEnd,
278 getUTC(utcBuff, sizeof(utcBuff)));
279 Emit("end", utcBuff, endBuff);
280 }
281
282// Cleanup
283//
284 if (mySad) delete(mySad);
285 if (oDest) free(oDest);
286 if (ffHdr) free(ffHdr);
287 if (ffTail) free(ffTail);
288 if (xtraFH) delete xtraFH;
289};
290
291/******************************************************************************/
292/* S o c k S t a t s */
293/******************************************************************************/
294
295#ifdef __linux__
296#include <linux/tcp.h>
297#endif
298
299void XrdNetPMarkFF::SockStats(struct sockStats &ss)
300{
301#ifndef __linux__
302 memset(&ss, 0, sizeof(struct sockStats));
303#else
304 EPName("SockStats");
305 struct tcp_info tcpInfo;
306 socklen_t tiLen = sizeof(tcpInfo);
307
308// The data returned is from the server's perspective. This must be
309// resolved by the caller as to which perspective should be presented.
310// Note that for put requests the source is the client.
311//
312 if (getsockopt(sockFD, IPPROTO_TCP, TCP_INFO, (void *)&tcpInfo, &tiLen) == 0)
313 {ss.bRecv = static_cast<uint64_t>(tcpInfo.tcpi_bytes_received);
314 ss.bSent = static_cast<uint64_t>(tcpInfo.tcpi_bytes_acked);
315 ss.msRTT = static_cast<uint32_t>(tcpInfo.tcpi_rtt/1000);
316 ss.usRTT = static_cast<uint32_t>(tcpInfo.tcpi_rtt%1000);
317 } else {
318 memset(&ss, 0, sizeof(struct sockStats));
319 DEBUG("Unable to get TCP information errno=" << strerror(errno));
320 }
321#endif
322}
323
324/******************************************************************************/
325/* S t a r t */
326/******************************************************************************/
327
329{
330 char appInfo[128], clIP[INET6_ADDRSTRLEN+2], svIP[INET6_ADDRSTRLEN+2];
331 int clPort, svPort;
332 char clType, svType;
333 bool fdok = false, odok = false;
334
335// Preform app if we need to
336//
337 if (!appName) *appInfo = 0;
338 else snprintf(appInfo,sizeof(appInfo),ffApp,sizeof(appInfo)-20,appName);
339
340// Get the file descriptor for the socket
341//
342 sockFD = addr.SockFD();
343
344// Obtain connectivity information about the peer and ourselves. We really
345// should obtain our external address and use that but the issue is that
346// we may have multiple external addresses and the client determines which
347// one actually gets used. So, it's complicated. A TODO.
348//
349 clPort = XrdNetUtils::GetSokInfo( sockFD, clIP, sizeof(clIP), clType);
350 if (clPort < 0)
351 {eDest->Emsg("PMarkFF", clPort, "get peer information.");
352 return false;
353 }
354
355 svPort = XrdNetUtils::GetSokInfo(-sockFD, svIP, sizeof(svIP), svType);
356 if (svPort < 0)
357 {eDest->Emsg("PMarkFF", clPort, "get self information.");
358 return false;
359 }
360
361// If there is no special collector, indicate so
362//
363 if (netMsg) fdok = true;
364
365// If the messages need to flow to the origin, get the destination information
366//
367 if (netOrg)
368 {const XrdNetSockAddr *urSad = addr.NetAddr();
369 if (!urSad) eDest->Emsg("PMarkFF", "unable to get origin address.");
370 else {char buff[1024];
371 mySad = new XrdNetSockAddr;
372 memcpy(mySad, urSad, sizeof(XrdNetSockAddr));
373 mySad->v4.sin_port = htons(static_cast<uint16_t>(ffPortO));
374 snprintf(buff, sizeof(buff), "%s:%d", clIP, ffPortO);
375 oDest = strdup(buff);
376 odok = true;
377 }
378 }
379
380// If we cannot report anywhere then indicate we failed
381//
382 if (!fdok && !odok) return false;
383
384// Format the base firefly template. Note that the client determines the
385// address family that is being used.
386//
387 char utcBuff[40], bseg0[512];
388 int len0 = snprintf(bseg0, sizeof(bseg0), ffFmt0, myHostName,
389 getUTC(utcBuff, sizeof(utcBuff)));
390 if (len0 >= (int)sizeof(bseg0))
391 {eDest->Emsg("PMarkFF", "invalid json; bseg0 truncated.");
392 return false;
393 }
394
395 ffHdr = strdup(bseg0);
396
397 char bseg1[256];
398 int len1 = snprintf(bseg1, sizeof(bseg1), ffFmt1, eCode, aCode, appInfo);
399 if (len1 >= (int)sizeof(bseg1))
400 {eDest->Emsg("PMarkFF", "invalid json; bseg1 truncated.");
401 return false;
402 }
403
404// Note that by convention FF packets the supplier of the data is designated
405// as the source. We only know this at this point for http requests and even
406// then it's hardly accurate. So, for put requests the src if the client.
407// Ottherwise, we designate the server as the source.
408//
409 char bseg2[256];
410 int len2;
411 if (appName && !strcmp(appName, "http-put"))
412 {len2 = snprintf(bseg2, sizeof(bseg2), ffFmt2,
413 clType, clIP, svIP, clPort, svPort);
414 } else {
415 len2 = snprintf(bseg2, sizeof(bseg2), ffFmt2,
416 clType, svIP, clIP, svPort, clPort);
417 }
418 if (len2 >= (int)sizeof(bseg2))
419 {eDest->Emsg("PMarkFF", "invalid json; cl bseg2 truncated.");
420 return false;
421 }
422
423 ffTailsz = len1 + len2;
424 ffTail = (char *)malloc(ffTailsz + 1);
425 strcpy(ffTail, bseg1);
426 strcpy(ffTail+len1, bseg2);
427
428// OK, we now can emit the starting packet
429//
430 fdOK = fdok;
431 odOK = odok;
432 return Emit("start", utcBuff, "");
433}
#define DEBUG(x)
static XrdSysError eDest(0,"crypto_")
#define EPName(ep)
#define IPPROTO_TCP
const XrdNetSockAddr * NetAddr()
int Send(const char *buff, int blen=0, const char *dest=0, int tmo=-1)
Definition XrdNetMsg.cc:70
bool Start(XrdNetAddrInfo &addr)
virtual ~XrdNetPMarkFF()
static int GetSokInfo(int fd, char *theAddr, int theALen, char &theType)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdSysTrace * Trace
XrdScheduler * Sched
XrdSysError * eDest
const char * myHostName