ns: Introduce lookup registration

Introduce a subscription based lookup model, in order for a client to be
able to listen to a future spawning service. The client can register any
number of queries with the ns and the ns will post lookup-results as
services are registred.

As a special case the list of lookup-results that is sent out as a
result of the registration of a new lookup will be finished by an empty
lookup response, allowing tools like qrtr-lookup to terminate as the
entire list of currently registed services is received.

Signed-off-by: Bjorn Andersson <bjorn.andersson@linaro.org>
This commit is contained in:
Bjorn Andersson 2017-06-06 20:51:59 -07:00
parent e377bc8e84
commit 326aad23ae
5 changed files with 196 additions and 5 deletions

View File

@ -150,6 +150,49 @@ int qrtr_bye(int sock, uint32_t service, uint16_t version, uint16_t instance)
return qrtr_remove_server(sock, service, version, instance);
}
int qrtr_new_lookup(int sock, uint32_t service, uint16_t version, uint16_t instance)
{
struct qrtr_ctrl_pkt pkt;
struct sockaddr_qrtr sq;
if (qrtr_getname(sock, &sq))
return -1;
memset(&pkt, 0, sizeof(pkt));
if (!sq.sq_port) {
LOGE("unable to register server on unbound port");
return -1;
}
pkt.cmd = cpu_to_le32(QRTR_CMD_NEW_LOOKUP);
pkt.server.service = cpu_to_le32(service);
pkt.server.instance = cpu_to_le32(instance << 16 | version);
pkt.server.node = cpu_to_le32(sq.sq_node);
pkt.server.port = cpu_to_le32(sq.sq_port);
return qrtr_sendto(sock, sq.sq_node, QRTR_CTRL_PORT, &pkt, sizeof(pkt));
}
int qrtr_remove_lookup(int sock, uint32_t service, uint16_t version, uint16_t instance)
{
struct qrtr_ctrl_pkt pkt;
struct sockaddr_qrtr sq;
if (qrtr_getname(sock, &sq))
return -1;
memset(&pkt, 0, sizeof(pkt));
pkt.cmd = cpu_to_le32(QRTR_CMD_DEL_LOOKUP);
pkt.server.service = cpu_to_le32(service);
pkt.server.instance = cpu_to_le32(instance << 16 | version);
pkt.server.node = cpu_to_le32(sq.sq_node);
pkt.server.port = cpu_to_le32(sq.sq_port);
return qrtr_sendto(sock, sq.sq_node, QRTR_CTRL_PORT, &pkt, sizeof(pkt));
}
int qrtr_poll(int sock, unsigned int ms)
{
struct pollfd fds;

View File

@ -10,9 +10,15 @@ int qrtr_sendto(int sock, uint32_t node, uint32_t port, const void *data, unsign
int qrtr_recvfrom(int sock, void *buf, unsigned int bsz, uint32_t *node, uint32_t *port);
int qrtr_recv(int sock, void *buf, unsigned int bsz);
int qrtr_new_server(int sock, uint32_t service, uint16_t version, uint16_t instance);
int qrtr_remove_server(int sock, uint32_t service, uint16_t version, uint16_t instance);
int qrtr_publish(int sock, uint32_t service, uint16_t version, uint16_t instance);
int qrtr_bye(int sock, uint32_t service, uint16_t version, uint16_t instance);
int qrtr_new_lookup(int sock, uint32_t service, uint16_t version, uint16_t instance);
int qrtr_remove_lookup(int sock, uint32_t service, uint16_t version, uint16_t instance);
int qrtr_poll(int sock, unsigned int ms);
int qrtr_lookup(int sock, uint32_t service, uint16_t version, uint16_t instance, uint32_t ifilter,
void (* cb)(void *,uint32_t,uint32_t,uint32_t,uint32_t), void *udata);

11
qrtr.py
View File

@ -53,6 +53,17 @@ class qrtr:
raise RuntimeError("remove_server failed")
self.service = None
def new_lookup(self, service, version, instance):
err = _qrtr.qrtr_new_lookup(self.sock, service, version, instance)
if err:
raise RuntimeError("new_lookup failed")
return (service, version, instance)
def remove_lookup(self, lookup):
err = _qrtr.qrtr_remove_lookup(self.sock, *lookup)
if err:
raise RuntimeError("remove_lookup failed")
def send(self, addr, data):
node, port = addr
n = _qrtr.qrtr_sendto(self.sock, node, port, c_char_p(data), len(data))

135
src/ns.c
View File

@ -26,9 +26,12 @@ static const char *ctrl_pkt_strings[] = {
[QRTR_CMD_RESUME_TX] = "resume-tx",
[QRTR_CMD_EXIT] = "exit",
[QRTR_CMD_PING] = "ping",
[QRTR_CMD_NEW_LOOKUP] = "new-lookup",
[QRTR_CMD_DEL_LOOKUP] = "del-lookup",
};
#define dprintf(...)
#define ARRAY_SIZE(x) (sizeof(x)/sizeof((x)[0]))
struct context {
int ctrl_sock;
@ -37,6 +40,8 @@ struct context {
int local_node;
struct sockaddr_qrtr bcast_sq;
struct list lookups;
};
struct server_filter {
@ -45,6 +50,14 @@ struct server_filter {
unsigned int ifilter;
};
struct lookup {
unsigned int service;
unsigned int instance;
struct sockaddr_qrtr sq;
struct list_item li;
};
struct server {
unsigned int service;
unsigned int instance;
@ -266,6 +279,27 @@ static struct server *server_del(unsigned int node_id, unsigned int port)
return srv;
}
static int lookup_notify(struct context *ctx, struct sockaddr_qrtr *to,
struct server *srv)
{
struct qrtr_ctrl_pkt pkt = {};
int rc;
pkt.cmd = QRTR_CMD_LOOKUP_RESULT;
if (srv) {
pkt.server.service = cpu_to_le32(srv->service);
pkt.server.instance = cpu_to_le32(srv->instance);
pkt.server.node = cpu_to_le32(srv->node);
pkt.server.port = cpu_to_le32(srv->port);
}
rc = sendto(ctx->ctrl_sock, &pkt, sizeof(pkt), 0,
(struct sockaddr *)to, sizeof(*to));
if (rc < 0)
warn("send lookup result failed");
return rc;
}
static int ctrl_cmd_hello(struct context *ctx, struct sockaddr_qrtr *sq,
const void *buf, size_t len)
{
@ -321,11 +355,26 @@ static int ctrl_cmd_del_client(struct context *ctx, unsigned node_id,
{
struct qrtr_ctrl_pkt pkt;
struct sockaddr_qrtr sq;
struct list_item *tmp;
struct lookup *lookup;
struct list_item *li;
struct map_entry *me;
struct server *srv;
struct node *node;
int rc;
/* Remove any lookups for this client */
list_for_each_safe(&ctx->lookups, li, tmp) {
lookup = container_of(li, struct lookup, li);
if (lookup->sq.sq_node != node_id)
continue;
if (lookup->sq.sq_port != port)
continue;
list_remove(&ctx->lookups, &lookup->li);
free(lookup);
}
/* Remove the server belonging to this port*/
srv = server_del(node_id, port);
if (srv) {
@ -364,6 +413,8 @@ static int ctrl_cmd_new_server(struct context *ctx, struct sockaddr_qrtr *from,
unsigned int service, unsigned int instance,
unsigned int node_id, unsigned int port)
{
struct lookup *lookup;
struct list_item *li;
struct server *srv;
int rc = 0;
@ -374,6 +425,16 @@ static int ctrl_cmd_new_server(struct context *ctx, struct sockaddr_qrtr *from,
if (srv->node == ctx->local_node)
rc = service_announce_new(ctx, &ctx->bcast_sq, srv);
list_for_each(&ctx->lookups, li) {
lookup = container_of(li, struct lookup, li);
if (lookup->service && lookup->service != service)
continue;
if (lookup->instance && lookup->instance != instance)
continue;
lookup_notify(ctx, &lookup->sq, srv);
}
return rc;
}
@ -396,6 +457,65 @@ static int ctrl_cmd_del_server(struct context *ctx, unsigned int service,
return rc;
}
static int ctrl_cmd_new_lookup(struct context *ctx, struct sockaddr_qrtr *from,
unsigned int service, unsigned int instance)
{
struct server_filter filter;
struct list reply_list;
struct lookup *lookup;
struct list_item *li;
struct server *srv;
lookup = calloc(1, sizeof(*lookup));
if (!lookup)
return -EINVAL;
lookup->sq = *from;
lookup->service = service;
lookup->instance = instance;
list_append(&ctx->lookups, &lookup->li);
memset(&filter, 0, sizeof(filter));
filter.service = service;
filter.instance = instance;
server_query(&filter, &reply_list);
list_for_each(&reply_list, li) {
srv = container_of(li, struct server, qli);
lookup_notify(ctx, from, srv);
}
lookup_notify(ctx, from, NULL);
return 0;
}
static int ctrl_cmd_del_lookup(struct context *ctx, struct sockaddr_qrtr *from,
unsigned int service, unsigned int instance)
{
struct lookup *lookup;
struct list_item *tmp;
struct list_item *li;
list_for_each_safe(&ctx->lookups, li, tmp) {
lookup = container_of(li, struct lookup, li);
if (lookup->sq.sq_node != from->sq_node)
continue;
if (lookup->sq.sq_port != from->sq_port)
continue;
if (lookup->service != service)
continue;
if (lookup->instance && lookup->instance != instance)
continue;
list_remove(&ctx->lookups, &lookup->li);
free(lookup);
}
return 0;
}
static void ctrl_port_fn(void *vcontext, struct waiter_ticket *tkt)
{
struct context *ctx = vcontext;
@ -418,14 +538,13 @@ static void ctrl_port_fn(void *vcontext, struct waiter_ticket *tkt)
}
msg = (void *)buf;
if (len < 4) {
warnx("short packet from %d:%d", sq.sq_node, sq.sq_port);
goto out;
}
cmd = le32_to_cpu(msg->cmd);
if (cmd <= _QRTR_CMD_MAX && ctrl_pkt_strings[cmd])
if (cmd < ARRAY_SIZE(ctrl_pkt_strings) && ctrl_pkt_strings[cmd])
dprintf("%s from %d:%d\n", ctrl_pkt_strings[cmd], sq.sq_node, sq.sq_port);
else
dprintf("UNK (%08x) from %d:%d\n", cmd, sq.sq_node, sq.sq_port);
@ -459,6 +578,16 @@ static void ctrl_port_fn(void *vcontext, struct waiter_ticket *tkt)
case QRTR_CMD_PING:
case QRTR_CMD_RESUME_TX:
break;
case QRTR_CMD_NEW_LOOKUP:
rc = ctrl_cmd_new_lookup(ctx, &sq,
le32_to_cpu(msg->server.service),
le32_to_cpu(msg->server.instance));
break;
case QRTR_CMD_DEL_LOOKUP:
rc = ctrl_cmd_del_lookup(ctx, &sq,
le32_to_cpu(msg->server.service),
le32_to_cpu(msg->server.instance));
break;
}
if (rc < 0)
@ -690,6 +819,8 @@ int main(int argc, char **argv)
if (w == NULL)
errx(1, "unable to create waiter");
list_init(&ctx.lookups);
rc = map_create(&nodes);
if (rc)
errx(1, "unable to create node map");

View File

@ -58,9 +58,9 @@ enum ctrl_pkt_cmd {
QRTR_CMD_RESUME_TX = 7,
QRTR_CMD_EXIT = 8,
QRTR_CMD_PING = 9,
_QRTR_CMD_CNT,
_QRTR_CMD_MAX = _QRTR_CMD_CNT - 1
QRTR_CMD_NEW_LOOKUP = 10,
QRTR_CMD_DEL_LOOKUP = 11,
QRTR_CMD_LOOKUP_RESULT = 12,
};
struct qrtr_ctrl_pkt {